protect protected content

This commit is contained in:
Niko Storni 2022-07-29 04:59:15 +02:00
parent a1c2e92ca3
commit 5693529216
7 changed files with 110 additions and 5 deletions

View file

@ -43,6 +43,7 @@ var (
//upstream configuration //upstream configuration
upstreamReflector string upstreamReflector string
upstreamProtocol string upstreamProtocol string
upstreamEdgeToken string
//downstream configuration //downstream configuration
requestQueueSize int requestQueueSize int
@ -84,6 +85,7 @@ func init() {
cmd.Flags().StringVar(&upstreamReflector, "upstream-reflector", "", "host:port of a reflector server where blobs are fetched from") cmd.Flags().StringVar(&upstreamReflector, "upstream-reflector", "", "host:port of a reflector server where blobs are fetched from")
cmd.Flags().StringVar(&upstreamProtocol, "upstream-protocol", "http", "protocol used to fetch blobs from another upstream reflector server (tcp/http3/http)") cmd.Flags().StringVar(&upstreamProtocol, "upstream-protocol", "http", "protocol used to fetch blobs from another upstream reflector server (tcp/http3/http)")
cmd.Flags().StringVar(&upstreamEdgeToken, "upstream-edge-token", "", "token used to retrieve/authenticate protected content")
cmd.Flags().IntVar(&requestQueueSize, "request-queue-size", 200, "How many concurrent requests from downstream should be handled at once (the rest will wait)") cmd.Flags().IntVar(&requestQueueSize, "request-queue-size", 200, "How many concurrent requests from downstream should be handled at once (the rest will wait)")
@ -130,7 +132,7 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
} }
defer http3PeerServer.Shutdown() defer http3PeerServer.Shutdown()
httpServer := http.NewServer(store.WithSingleFlight("sf-http", underlyingStoreWithCaches), requestQueueSize) httpServer := http.NewServer(store.WithSingleFlight("sf-http", underlyingStoreWithCaches), requestQueueSize, upstreamEdgeToken)
err = httpServer.Start(":" + strconv.Itoa(httpPeerPort)) err = httpServer.Start(":" + strconv.Itoa(httpPeerPort))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
@ -167,7 +169,7 @@ func initUpstreamStore() store.BlobStore {
Timeout: 30 * time.Second, Timeout: 30 * time.Second,
}) })
case "http": case "http":
s = store.NewHttpStore(upstreamReflector) s = store.NewHttpStore(upstreamReflector, upstreamEdgeToken)
default: default:
log.Fatalf("protocol is not recognized: %s", upstreamProtocol) log.Fatalf("protocol is not recognized: %s", upstreamProtocol)
} }

View file

@ -0,0 +1,82 @@
package reflector
import (
"encoding/json"
"net/http"
"time"
"github.com/bluele/gcache"
"github.com/lbryio/lbry.go/v2/extras/errors"
"golang.org/x/sync/singleflight"
)
const protectedListURL = "https://api.odysee.com/file/list_protected"
type ProtectedContent struct {
SDHash string `json:"sd_hash"`
ClaimID string `json:"claim_id"`
}
var blockedCache = gcache.New(10).Expiration(2 * time.Minute).Build()
func GetBlockedContent() (map[string]bool, error) {
cachedVal, err := blockedCache.Get("protected")
if err == nil && cachedVal != nil {
return cachedVal.(map[string]bool), nil
}
method := "GET"
var r struct {
Success bool `json:"success"`
Error string `json:"error"`
Data []ProtectedContent `json:"data"`
}
client := &http.Client{}
req, err := http.NewRequest(method, protectedListURL, nil)
if err != nil {
return nil, errors.Err(err)
}
res, err := client.Do(req)
if err != nil {
return nil, errors.Err(err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, errors.Err("unexpected status code %d", res.StatusCode)
}
if err = json.NewDecoder(res.Body).Decode(&r); err != nil {
return nil, errors.Err(err)
}
if !r.Success {
return nil, errors.Prefix("file/list_protected API call", r.Error)
}
protectedMap := make(map[string]bool, len(r.Data))
for _, pc := range r.Data {
protectedMap[pc.SDHash] = true
}
err = blockedCache.Set("protected", protectedMap)
if err != nil {
return protectedMap, errors.Err(err)
}
return protectedMap, nil
}
var sf = singleflight.Group{}
func IsProtected(sdHash string) bool {
val, err, _ := sf.Do(sdHash, func() (interface{}, error) {
protectedMap, err := GetBlockedContent()
if err != nil {
return nil, err
}
return protectedMap[sdHash], nil
})
if err != nil {
return false
}
return val.(bool)
}

View file

@ -6,6 +6,7 @@ import (
"time" "time"
"github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/reflector"
"github.com/lbryio/reflector.go/shared" "github.com/lbryio/reflector.go/shared"
"github.com/lbryio/reflector.go/store" "github.com/lbryio/reflector.go/store"
@ -30,7 +31,13 @@ func (s *Server) HandleGetBlob(c *gin.Context) {
}() }()
start := time.Now() start := time.Now()
hash := c.Query("hash") hash := c.Query("hash")
edgeToken := c.Query("edge_token")
if reflector.IsProtected(hash) && edgeToken != s.edgeToken {
_ = c.Error(errors.Err("requested blob is protected"))
c.String(http.StatusForbidden, "requested blob is protected")
return
}
if s.missesCache.Has(hash) { if s.missesCache.Has(hash) {
serialized, err := shared.NewBlobTrace(time.Since(start), "http").Serialize() serialized, err := shared.NewBlobTrace(time.Since(start), "http").Serialize()
c.Header("Via", serialized) c.Header("Via", serialized)

View file

@ -21,15 +21,17 @@ type Server struct {
grp *stop.Group grp *stop.Group
concurrentRequests int concurrentRequests int
missesCache gcache.Cache missesCache gcache.Cache
edgeToken string
} }
// NewServer returns an initialized Server pointer. // NewServer returns an initialized Server pointer.
func NewServer(store store.BlobStore, requestQueueSize int) *Server { func NewServer(store store.BlobStore, requestQueueSize int, edgeToken string) *Server {
return &Server{ return &Server{
store: store, store: store,
grp: stop.New(), grp: stop.New(),
concurrentRequests: requestQueueSize, concurrentRequests: requestQueueSize,
missesCache: gcache.New(2000).Expiration(5 * time.Minute).ARC().Build(), missesCache: gcache.New(2000).Expiration(5 * time.Minute).ARC().Build(),
edgeToken: edgeToken,
} }
} }

View file

@ -15,6 +15,7 @@ import (
"time" "time"
"github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/reflector"
"github.com/lbryio/reflector.go/store" "github.com/lbryio/reflector.go/store"
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/errors"
@ -181,7 +182,10 @@ func (s *Server) HandleGetBlob(w http.ResponseWriter, r *http.Request) {
wantsTrace = false wantsTrace = false
} }
} }
if reflector.IsProtected(requestedBlob) {
http.Error(w, "requested blob is protected", http.StatusForbidden)
return
}
blob, trace, err := s.store.Get(requestedBlob) blob, trace, err := s.store.Get(requestedBlob)
if wantsTrace { if wantsTrace {

View file

@ -239,6 +239,9 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) {
if len(request.RequestedBlobs) > 0 { if len(request.RequestedBlobs) > 0 {
var availableBlobs []string var availableBlobs []string
for _, blobHash := range request.RequestedBlobs { for _, blobHash := range request.RequestedBlobs {
if reflector.IsProtected(blobHash) {
return nil, errors.Err("requested blob is protected")
}
exists, err := s.store.Has(blobHash) exists, err := s.store.Has(blobHash)
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -20,12 +20,14 @@ import (
type HttpStore struct { type HttpStore struct {
upstream string upstream string
httpClient *http.Client httpClient *http.Client
edgeToken string
} }
func NewHttpStore(upstream string) *HttpStore { func NewHttpStore(upstream, edgeToken string) *HttpStore {
return &HttpStore{ return &HttpStore{
upstream: "http://" + upstream, upstream: "http://" + upstream,
httpClient: getClient(), httpClient: getClient(),
edgeToken: edgeToken,
} }
} }
@ -61,6 +63,9 @@ func (n *HttpStore) Has(hash string) (bool, error) {
func (n *HttpStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { func (n *HttpStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now() start := time.Now()
url := n.upstream + "/blob?hash=" + hash url := n.upstream + "/blob?hash=" + hash
if n.edgeToken != "" {
url += "&token=" + n.edgeToken
}
req, err := http.NewRequest("GET", url, nil) req, err := http.NewRequest("GET", url, nil)
if err != nil { if err != nil {