diff --git a/cmd/reflector.go b/cmd/reflector.go index d7b89aa..6dbbfb5 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -43,6 +43,7 @@ var ( //upstream configuration upstreamReflector string upstreamProtocol string + upstreamEdgeToken string //downstream configuration requestQueueSize int @@ -84,6 +85,7 @@ func init() { cmd.Flags().StringVar(&upstreamReflector, "upstream-reflector", "", "host:port of a reflector server where blobs are fetched from") cmd.Flags().StringVar(&upstreamProtocol, "upstream-protocol", "http", "protocol used to fetch blobs from another upstream reflector server (tcp/http3/http)") + cmd.Flags().StringVar(&upstreamEdgeToken, "upstream-edge-token", "", "token used to retrieve/authenticate protected content") cmd.Flags().IntVar(&requestQueueSize, "request-queue-size", 200, "How many concurrent requests from downstream should be handled at once (the rest will wait)") @@ -130,7 +132,7 @@ func reflectorCmd(cmd *cobra.Command, args []string) { } defer http3PeerServer.Shutdown() - httpServer := http.NewServer(store.WithSingleFlight("sf-http", underlyingStoreWithCaches), requestQueueSize) + httpServer := http.NewServer(store.WithSingleFlight("sf-http", underlyingStoreWithCaches), requestQueueSize, upstreamEdgeToken) err = httpServer.Start(":" + strconv.Itoa(httpPeerPort)) if err != nil { log.Fatal(err) @@ -167,7 +169,7 @@ func initUpstreamStore() store.BlobStore { Timeout: 30 * time.Second, }) case "http": - s = store.NewHttpStore(upstreamReflector) + s = store.NewHttpStore(upstreamReflector, upstreamEdgeToken) default: log.Fatalf("protocol is not recognized: %s", upstreamProtocol) } diff --git a/reflector/protected_content.go b/reflector/protected_content.go new file mode 100644 index 0000000..6c4d289 --- /dev/null +++ b/reflector/protected_content.go @@ -0,0 +1,82 @@ +package reflector + +import ( + "encoding/json" + "net/http" + "time" + + "github.com/bluele/gcache" + "github.com/lbryio/lbry.go/v2/extras/errors" + "golang.org/x/sync/singleflight" +) + +const protectedListURL = "https://api.odysee.com/file/list_protected" + +type ProtectedContent struct { + SDHash string `json:"sd_hash"` + ClaimID string `json:"claim_id"` +} + +var blockedCache = gcache.New(10).Expiration(2 * time.Minute).Build() + +func GetBlockedContent() (map[string]bool, error) { + cachedVal, err := blockedCache.Get("protected") + if err == nil && cachedVal != nil { + return cachedVal.(map[string]bool), nil + } + + method := "GET" + var r struct { + Success bool `json:"success"` + Error string `json:"error"` + Data []ProtectedContent `json:"data"` + } + + client := &http.Client{} + req, err := http.NewRequest(method, protectedListURL, nil) + + if err != nil { + return nil, errors.Err(err) + } + res, err := client.Do(req) + if err != nil { + return nil, errors.Err(err) + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + return nil, errors.Err("unexpected status code %d", res.StatusCode) + } + if err = json.NewDecoder(res.Body).Decode(&r); err != nil { + return nil, errors.Err(err) + } + + if !r.Success { + return nil, errors.Prefix("file/list_protected API call", r.Error) + } + + protectedMap := make(map[string]bool, len(r.Data)) + for _, pc := range r.Data { + protectedMap[pc.SDHash] = true + } + err = blockedCache.Set("protected", protectedMap) + if err != nil { + return protectedMap, errors.Err(err) + } + return protectedMap, nil +} + +var sf = singleflight.Group{} + +func IsProtected(sdHash string) bool { + val, err, _ := sf.Do(sdHash, func() (interface{}, error) { + protectedMap, err := GetBlockedContent() + if err != nil { + return nil, err + } + return protectedMap[sdHash], nil + }) + if err != nil { + return false + } + return val.(bool) +} diff --git a/server/http/routes.go b/server/http/routes.go index 56586b6..77a30ce 100644 --- a/server/http/routes.go +++ b/server/http/routes.go @@ -6,6 +6,7 @@ import ( "time" "github.com/lbryio/reflector.go/internal/metrics" + "github.com/lbryio/reflector.go/reflector" "github.com/lbryio/reflector.go/shared" "github.com/lbryio/reflector.go/store" @@ -30,7 +31,13 @@ func (s *Server) HandleGetBlob(c *gin.Context) { }() start := time.Now() hash := c.Query("hash") + edgeToken := c.Query("edge_token") + if reflector.IsProtected(hash) && edgeToken != s.edgeToken { + _ = c.Error(errors.Err("requested blob is protected")) + c.String(http.StatusForbidden, "requested blob is protected") + return + } if s.missesCache.Has(hash) { serialized, err := shared.NewBlobTrace(time.Since(start), "http").Serialize() c.Header("Via", serialized) diff --git a/server/http/server.go b/server/http/server.go index 33834e1..cb51150 100644 --- a/server/http/server.go +++ b/server/http/server.go @@ -21,15 +21,17 @@ type Server struct { grp *stop.Group concurrentRequests int missesCache gcache.Cache + edgeToken string } // NewServer returns an initialized Server pointer. -func NewServer(store store.BlobStore, requestQueueSize int) *Server { +func NewServer(store store.BlobStore, requestQueueSize int, edgeToken string) *Server { return &Server{ store: store, grp: stop.New(), concurrentRequests: requestQueueSize, missesCache: gcache.New(2000).Expiration(5 * time.Minute).ARC().Build(), + edgeToken: edgeToken, } } diff --git a/server/http3/server.go b/server/http3/server.go index 46fc12b..8def0aa 100644 --- a/server/http3/server.go +++ b/server/http3/server.go @@ -15,6 +15,7 @@ import ( "time" "github.com/lbryio/reflector.go/internal/metrics" + "github.com/lbryio/reflector.go/reflector" "github.com/lbryio/reflector.go/store" "github.com/lbryio/lbry.go/v2/extras/errors" @@ -181,7 +182,10 @@ func (s *Server) HandleGetBlob(w http.ResponseWriter, r *http.Request) { wantsTrace = false } } - + if reflector.IsProtected(requestedBlob) { + http.Error(w, "requested blob is protected", http.StatusForbidden) + return + } blob, trace, err := s.store.Get(requestedBlob) if wantsTrace { diff --git a/server/peer/server.go b/server/peer/server.go index 064a47e..7de420b 100644 --- a/server/peer/server.go +++ b/server/peer/server.go @@ -239,6 +239,9 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) { if len(request.RequestedBlobs) > 0 { var availableBlobs []string for _, blobHash := range request.RequestedBlobs { + if reflector.IsProtected(blobHash) { + return nil, errors.Err("requested blob is protected") + } exists, err := s.store.Has(blobHash) if err != nil { return nil, err diff --git a/store/http.go b/store/http.go index 8f9f403..038be81 100644 --- a/store/http.go +++ b/store/http.go @@ -20,12 +20,14 @@ import ( type HttpStore struct { upstream string httpClient *http.Client + edgeToken string } -func NewHttpStore(upstream string) *HttpStore { +func NewHttpStore(upstream, edgeToken string) *HttpStore { return &HttpStore{ upstream: "http://" + upstream, httpClient: getClient(), + edgeToken: edgeToken, } } @@ -61,6 +63,9 @@ func (n *HttpStore) Has(hash string) (bool, error) { func (n *HttpStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { start := time.Now() url := n.upstream + "/blob?hash=" + hash + if n.edgeToken != "" { + url += "&token=" + n.edgeToken + } req, err := http.NewRequest("GET", url, nil) if err != nil {