diff --git a/cmd/reflector.go b/cmd/reflector.go index aacef08..49bef3e 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -76,7 +76,7 @@ func reflectorCmd(cmd *cobra.Command, args []string) { outerStore := wrapWithCache(underlyingStore, cleanerStopper) if !disableUploads { - reflectorServer := reflector.NewServer(underlyingStore) + reflectorServer := reflector.NewServer(underlyingStore, outerStore) reflectorServer.Timeout = 3 * time.Minute reflectorServer.EnableBlocklist = !disableBlocklist diff --git a/cmd/test.go b/cmd/test.go index a330856..afa2ff2 100644 --- a/cmd/test.go +++ b/cmd/test.go @@ -31,7 +31,7 @@ func testCmd(cmd *cobra.Command, args []string) { memStore := store.NewMemStore() - reflectorServer := reflector.NewServer(memStore) + reflectorServer := reflector.NewServer(memStore, memStore) reflectorServer.Timeout = 3 * time.Minute err := reflectorServer.Start(":" + strconv.Itoa(reflector.DefaultPort)) diff --git a/prism/prism.go b/prism/prism.go index 12d0329..ee18ba1 100644 --- a/prism/prism.go +++ b/prism/prism.go @@ -79,7 +79,7 @@ func New(conf *Config) *Prism { dht: d, cluster: c, peer: peer.NewServer(conf.Blobs), - reflector: reflector.NewServer(conf.Blobs), + reflector: reflector.NewServer(conf.Blobs, conf.Blobs), grp: stop.New(), } diff --git a/reflector/server.go b/reflector/server.go index bbe3d33..592ba3d 100644 --- a/reflector/server.go +++ b/reflector/server.go @@ -40,16 +40,18 @@ type Server struct { EnableBlocklist bool // if true, blocklist checking and blob deletion will be enabled - store store.BlobStore - grp *stop.Group + underlyingStore store.BlobStore + outerStore store.BlobStore + grp *stop.Group } // NewServer returns an initialized reflector server pointer. -func NewServer(store store.BlobStore) *Server { +func NewServer(underlying store.BlobStore, outer store.BlobStore) *Server { return &Server{ - Timeout: DefaultTimeout, - store: store, - grp: stop.New(), + Timeout: DefaultTimeout, + underlyingStore: underlying, + outerStore: outer, + grp: stop.New(), } } @@ -85,7 +87,7 @@ func (s *Server) Start(address string) error { }() if s.EnableBlocklist { - if b, ok := s.store.(store.Blocklister); ok { + if b, ok := s.underlyingStore.(store.Blocklister); ok { s.grp.Add(1) go func() { s.enableBlocklist(b) @@ -190,13 +192,13 @@ func (s *Server) receiveBlob(conn net.Conn) error { } var wantsBlob bool - if bl, ok := s.store.(store.Blocklister); ok { + if bl, ok := s.underlyingStore.(store.Blocklister); ok { wantsBlob, err = bl.Wants(blobHash) if err != nil { return err } } else { - blobExists, err := s.store.Has(blobHash) + blobExists, err := s.underlyingStore.Has(blobHash) if err != nil { return err } @@ -206,7 +208,7 @@ func (s *Server) receiveBlob(conn net.Conn) error { var neededBlobs []string if isSdBlob && !wantsBlob { - if nbc, ok := s.store.(neededBlobChecker); ok { + if nbc, ok := s.underlyingStore.(neededBlobChecker); ok { neededBlobs, err = nbc.MissingBlobsForKnownStream(blobHash) if err != nil { return err @@ -249,9 +251,9 @@ func (s *Server) receiveBlob(conn net.Conn) error { log.Debugln("Got blob " + blobHash[:8]) if isSdBlob { - err = s.store.PutSD(blobHash, blob) + err = s.outerStore.PutSD(blobHash, blob) } else { - err = s.store.Put(blobHash, blob) + err = s.outerStore.Put(blobHash, blob) } if err != nil { return err