From 4280edc4652df5a084439c9c34af86691e54529d Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Wed, 9 Jun 2021 20:04:06 -0400 Subject: [PATCH] cleanup and updates --- dev.sh | 2 +- main.go | 71 ++++++++----------------- server/search.go | 136 +++++++++++++++++++++++++++++++++++++---------- server/server.go | 61 ++------------------- 4 files changed, 136 insertions(+), 134 deletions(-) diff --git a/dev.sh b/dev.sh index 98e2823..78b6117 100755 --- a/dev.sh +++ b/dev.sh @@ -3,4 +3,4 @@ hash reflex 2>/dev/null || go get github.com/cespare/reflex hash reflex 2>/dev/null || { echo >&2 'Make sure '"$(go env GOPATH)"'/bin is in your $PATH'; exit 1; } -reflex --decoration=none --start-service=true -- sh -c "go run . serve" +reflex --decoration=none --start-service=true -- sh -c "go run . serve --dev" diff --git a/main.go b/main.go index e4ed086..64792ca 100644 --- a/main.go +++ b/main.go @@ -19,51 +19,43 @@ import ( ) const ( + defaultHost = "0.0.0.0" defaultPort = "50051" - defaultRPCUser = "rpcuser" - defaultRPCPassword = "rpcpassword" defaultEsHost = "http://localhost" defaultEsPort = "9200" ) -type loginCreds struct { - Username, Password string -} -func (c *loginCreds) GetRequestMetadata(context.Context, ...string) (map[string]string, error) { - return map[string]string{ - "username": c.Username, - "password": c.Password, - }, nil -} - -func (c *loginCreds) RequireTransportSecurity() bool { - return false -} - -func parseArgs(searchRequest *pb.SearchRequest) *server.Args { - getenvironment := func(data []string, getkeyval func(item string) (key, val string)) map[string]string { - items := make(map[string]string) - for _, item := range data { - key, val := getkeyval(item) - items[key] = val - } - return items +func GetEnvironment(data []string, getkeyval func(item string) (key, val string)) map[string]string { + items := make(map[string]string) + for _, item := range data { + key, val := getkeyval(item) + items[key] = val } - environment := getenvironment(os.Environ(), func(item string) (key, val string) { + return items +} + +func GetEnvironmentStandard() map[string]string { + return GetEnvironment(os.Environ(), func(item string) (key, val string) { splits := strings.Split(item, "=") key = splits[0] val = splits[1] return }) +} +func parseArgs(searchRequest *pb.SearchRequest) *server.Args { + + environment := GetEnvironmentStandard() parser := argparse.NewParser("hub", "hub server and client") serveCmd := parser.NewCommand("serve", "start the hub server") + host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "host", Default: defaultHost}) port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "port", Default: defaultPort}) - user := parser.String("", "rpcuser", &argparse.Options{Required: false, Help: "user", Default: defaultRPCUser}) - pass := parser.String("", "rpcpassword", &argparse.Options{Required: false, Help: "password", Default: defaultRPCPassword}) + esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "host", Default: defaultEsHost}) + esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "port", Default: defaultEsPort}) + dev := parser.Flag("", "dev", &argparse.Options{Required: false, Help: "port", Default: false}) text := parser.String("", "text", &argparse.Options{Required: false, Help: "text query"}) name := parser.String("", "name", &argparse.Options{Required: false, Help: "name"}) @@ -84,11 +76,11 @@ func parseArgs(searchRequest *pb.SearchRequest) *server.Args { args := &server.Args{ Serve: false, + Host: *host, Port: ":" + *port, - User: *user, - Pass: *pass, - EsHost: defaultEsHost, - EsPort: defaultEsPort, + EsHost: *esHost, + EsPort: *esPort, + Dev: *dev, } if esHost, ok := environment["ELASTIC_HOST"]; ok { @@ -143,9 +135,6 @@ func parseArgs(searchRequest *pb.SearchRequest) *server.Args { func main() { searchRequest := &pb.SearchRequest{} - // - //res := schema.ParseURL("@abc#1111") - //log.Println(res) args := parseArgs(searchRequest) @@ -170,10 +159,6 @@ func main() { conn, err := grpc.Dial("localhost"+args.Port, grpc.WithInsecure(), grpc.WithBlock(), - //grpc.WithPerRPCCredentials(&loginCreds{ - // Username: args.User, - // Password: args.Pass, - //}), ) if err != nil { log.Fatalf("did not connect: %v", err) @@ -182,16 +167,6 @@ func main() { c := pb.NewHubClient(conn) - /* - var query string - if len(os.Args) > 1 { - query = strings.Join(os.Args[1:], " ") - } else { - log.Printf("error: no search query provided\n") - os.Exit(1) - } - */ - ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() diff --git a/server/search.go b/server/search.go index ba41cd6..4c18ac1 100644 --- a/server/search.go +++ b/server/search.go @@ -7,7 +7,6 @@ import ( "github.com/golang/protobuf/ptypes/wrappers" pb "github.com/lbryio/hub/protobuf/go" "math" - //"github.com/lbryio/hub/schema" "github.com/lbryio/hub/util" "github.com/olivere/elastic/v7" @@ -20,13 +19,30 @@ import ( ) type record struct { - Txid string `json:"tx_id"` - Nout uint32 `json:"tx_nout"` - Height uint32 `json:"height"` - ClaimId string `json:"claim_id"` - ChannelId string `json:"channel_id"` - CreationHeight uint32 `json:"creation_height"` - RepostedClaimId string `json:"reposted_claim_id"` + Txid string `json:"tx_id"` + Nout uint32 `json:"tx_nout"` + Height uint32 `json:"height"` + ClaimId string `json:"claim_id"` + ChannelId string `json:"channel_id"` + RepostedClaimId string `json:"reposted_claim_id"` + CensorType uint32 `json:"censor_type"` + CensoringChannelHash string `json:"censoring_channel_hash"` + ShortUrl string `json:"short_url"` + CanonicalUrl string `json:"canonical_url"` + IsControlling bool `json:"is_controlling"` + TakeOverHeight uint32 `json:"take_over_height"` + CreationHeight uint32 `json:"creation_height"` + ActivationHeight uint32 `json:"activation_height"` + ExpirationHeight uint32 `json:"expiration_height"` + ClaimsInChannel uint32 `json:"claims_in_channel"` + Reposted uint32 `json:"reposted"` + EffectiveAmount uint64 `json:"effective_amount"` + SupportAmount uint64 `json:"support_amount"` + TrendingGroup uint32 `json:"trending_group"` + TrendingMixed float32 `json:"trending_mixed"` + TrendingLocal float32 `json:"trending_local"` + TrendingGlobal float32 `json:"trending_global"` + Name string `json:"name"` } type orderField struct { @@ -379,24 +395,25 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, q = q.Must(textQuery) } - - //TODO make this only happen in dev environment - indices, err := client.IndexNames() - if err != nil { - log.Fatalln(err) - } - var numIndices = 0 - if len(indices) > 0 { - numIndices = len(indices) - 1 - } - searchIndices := make([]string, numIndices) - j := 0 - for i := 0; j < numIndices; i++ { - if indices[i] == "claims" { - continue + var searchIndices = []string{} + if s.Args.Dev { + indices, err := client.IndexNames() + if err != nil { + log.Fatalln(err) + } + var numIndices = 0 + if len(indices) > 0 { + numIndices = len(indices) - 1 + } + searchIndices = make([]string, numIndices) + j := 0 + for i := 0; j < numIndices; i++ { + if indices[i] == "claims" { + continue + } + searchIndices[j] = indices[i] + j = j + 1 } - searchIndices[j] = indices[i] - j = j + 1 } fsc := elastic.NewFetchSourceContext(true).Exclude("description", "title")//.Include("_id") @@ -419,6 +436,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, var txos []*pb.Output var records []*record + var blocked []*pb.Blocked = make([]*pb.Blocked, 0) records = make([]*record, 0, searchResult.TotalHits()) @@ -429,23 +447,46 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, } } + records = removeBlocked(records, &blocked) + if in.RemoveDuplicates != nil { records = removeDuplicates(records) } - if in.LimitClaimsPerChannel != nil { + if in.LimitClaimsPerChannel != nil && in.LimitClaimsPerChannel.Value > 0 { records = searchAhead(records, pageSize, int(in.LimitClaimsPerChannel.Value)) } finalLength := int(math.Min(float64(len(records)), float64(pageSize))) txos = make([]*pb.Output, 0, finalLength) - j = 0 + var j = 0 for i := from; i < from + finalLength && i < len(records) && j < finalLength; i++ { t := records[i] res := &pb.Output{ TxHash: util.ToHash(t.Txid), Nout: t.Nout, Height: t.Height, + Meta: &pb.Output_Claim{ + Claim: &pb.ClaimMeta{ + //Channel: + //Repost: + ShortUrl: t.ShortUrl, + CanonicalUrl: t.CanonicalUrl, + IsControlling: t.IsControlling, + TakeOverHeight: t.TakeOverHeight, + CreationHeight: t.CreationHeight, + ActivationHeight: t.ActivationHeight, + ExpirationHeight: t.ExpirationHeight, + ClaimsInChannel: t.ClaimsInChannel, + Reposted: t.Reposted, + EffectiveAmount: t.EffectiveAmount, + SupportAmount: t.SupportAmount, + TrendingGroup: t.TrendingGroup, + TrendingMixed: t.TrendingMixed, + TrendingLocal: t.TrendingLocal, + TrendingGlobal: t.TrendingGlobal, + }, + }, } txos = append(txos, res) j += 1 @@ -472,10 +513,24 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, // //return nil, nil //} + if in.NoTotals != nil && !in.NoTotals.Value { + return &pb.Outputs{ + Txos: txos, + Offset: uint32(int64(from) + searchResult.TotalHits()), + Blocked: blocked, + }, nil + } + + var blockedTotal uint32 = 0 + for _, b := range blocked { + blockedTotal += b.Count + } return &pb.Outputs{ Txos: txos, Total: uint32(searchResult.TotalHits()), Offset: uint32(int64(from) + searchResult.TotalHits()), + Blocked: blocked, + BlockedTotal: blockedTotal, }, nil } @@ -574,3 +629,30 @@ func removeDuplicates(searchHits []*record) []*record { return deduped } + +func removeBlocked(searchHits []*record, blocked *[]*pb.Blocked) []*record { + newHits := make([]*record, 0, len(searchHits)) + blockedChannels := make(map[string]*pb.Blocked) + for _, r := range searchHits { + if r.CensorType != 0 { + if blockedChannels[r.ChannelId] == nil { + blockedObj := &pb.Blocked{ + Count: 1, + Channel: &pb.Output{ + TxHash: util.ToHash(r.Txid), + Nout: r.Nout, + Height: r.Height, + }, + } + *blocked = append(*blocked, blockedObj) + blockedChannels[r.ChannelId] = blockedObj + } else { + blockedChannels[r.ChannelId].Count += 1 + } + } else { + newHits = append(newHits, r) + } + } + + return newHits +} diff --git a/server/server.go b/server/server.go index e018794..cdbbef5 100644 --- a/server/server.go +++ b/server/server.go @@ -1,11 +1,9 @@ package server import ( - "context" pb "github.com/lbryio/hub/protobuf/go" "github.com/olivere/elastic/v7" "google.golang.org/grpc" - "google.golang.org/grpc/metadata" "log" "regexp" ) @@ -21,21 +19,11 @@ type Server struct { type Args struct { Serve bool + Host string Port string - User string - Pass string EsHost string EsPort string -} - -type AccessDeniedErr struct {} -func (AccessDeniedErr) Error() string { - return "Username or password incorrect." -} - -type EmptyMetadataErr struct {} -func (EmptyMetadataErr) Error() string { - return "No username or password specified." + Dev bool } /* @@ -77,9 +65,7 @@ func (EmptyMetadataErr) Error() string { 'blockchain.address.unsubscribe' */ -//func MakeHubServer(args Args) *grpc.Server { func MakeHubServer(args *Args) *Server { - // authorize := makeAuthorizeFunc(args.User, args.Pass) grpcServer := grpc.NewServer() multiSpaceRe, err := regexp.Compile("\\s{2,}") @@ -100,45 +86,4 @@ func MakeHubServer(args *Args) *Server { } return s - // grpc.StreamInterceptor(makeStreamInterceptor(authorize)), - // grpc.UnaryInterceptor(makeUnaryInterceptor(authorize)), - //) -} - -func makeStreamInterceptor(authorize func(context.Context) error) func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - if err := authorize(stream.Context()); err != nil { - return err - } - - return handler(srv, stream) - } -} - - -func makeUnaryInterceptor(authorize func(context.Context) error) func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - if err := authorize(ctx); err != nil { - return nil, err - } - - return handler(ctx, req) - } -} - -func makeAuthorizeFunc(username string, password string) func(context.Context) error { - - return func(ctx context.Context) error { - if md, ok := metadata.FromIncomingContext(ctx); ok { - log.Println(md) - if len(md["username"]) > 0 && md["username"][0] == username && - len(md["password"]) > 0 && md["password"][0] == password { - return nil - } - - return AccessDeniedErr{} - } - - return EmptyMetadataErr{} - } -} +} \ No newline at end of file