when reflecting a sdblob, insert all the stream and intermediate blobs using a transaction #50
4 changed files with 17 additions and 15 deletions
|
@ -76,7 +76,7 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
|
|||
outerStore := wrapWithCache(underlyingStore, cleanerStopper)
|
||||
|
||||
if !disableUploads {
|
||||
reflectorServer := reflector.NewServer(underlyingStore)
|
||||
reflectorServer := reflector.NewServer(underlyingStore, outerStore)
|
||||
reflectorServer.Timeout = 3 * time.Minute
|
||||
reflectorServer.EnableBlocklist = !disableBlocklist
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ func testCmd(cmd *cobra.Command, args []string) {
|
|||
|
||||
memStore := store.NewMemStore()
|
||||
|
||||
reflectorServer := reflector.NewServer(memStore)
|
||||
reflectorServer := reflector.NewServer(memStore, memStore)
|
||||
reflectorServer.Timeout = 3 * time.Minute
|
||||
|
||||
err := reflectorServer.Start(":" + strconv.Itoa(reflector.DefaultPort))
|
||||
|
|
|
@ -79,7 +79,7 @@ func New(conf *Config) *Prism {
|
|||
dht: d,
|
||||
cluster: c,
|
||||
peer: peer.NewServer(conf.Blobs),
|
||||
reflector: reflector.NewServer(conf.Blobs),
|
||||
reflector: reflector.NewServer(conf.Blobs, conf.Blobs),
|
||||
|
||||
grp: stop.New(),
|
||||
}
|
||||
|
|
|
@ -40,16 +40,18 @@ type Server struct {
|
|||
|
||||
EnableBlocklist bool // if true, blocklist checking and blob deletion will be enabled
|
||||
|
||||
store store.BlobStore
|
||||
grp *stop.Group
|
||||
underlyingStore store.BlobStore
|
||||
outerStore store.BlobStore
|
||||
grp *stop.Group
|
||||
}
|
||||
|
||||
// NewServer returns an initialized reflector server pointer.
|
||||
func NewServer(store store.BlobStore) *Server {
|
||||
func NewServer(underlying store.BlobStore, outer store.BlobStore) *Server {
|
||||
return &Server{
|
||||
Timeout: DefaultTimeout,
|
||||
store: store,
|
||||
grp: stop.New(),
|
||||
Timeout: DefaultTimeout,
|
||||
underlyingStore: underlying,
|
||||
outerStore: outer,
|
||||
grp: stop.New(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -85,7 +87,7 @@ func (s *Server) Start(address string) error {
|
|||
}()
|
||||
|
||||
if s.EnableBlocklist {
|
||||
if b, ok := s.store.(store.Blocklister); ok {
|
||||
if b, ok := s.underlyingStore.(store.Blocklister); ok {
|
||||
s.grp.Add(1)
|
||||
go func() {
|
||||
s.enableBlocklist(b)
|
||||
|
@ -190,13 +192,13 @@ func (s *Server) receiveBlob(conn net.Conn) error {
|
|||
}
|
||||
|
||||
var wantsBlob bool
|
||||
if bl, ok := s.store.(store.Blocklister); ok {
|
||||
if bl, ok := s.underlyingStore.(store.Blocklister); ok {
|
||||
wantsBlob, err = bl.Wants(blobHash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
blobExists, err := s.store.Has(blobHash)
|
||||
blobExists, err := s.underlyingStore.Has(blobHash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -206,7 +208,7 @@ func (s *Server) receiveBlob(conn net.Conn) error {
|
|||
var neededBlobs []string
|
||||
|
||||
if isSdBlob && !wantsBlob {
|
||||
if nbc, ok := s.store.(neededBlobChecker); ok {
|
||||
if nbc, ok := s.underlyingStore.(neededBlobChecker); ok {
|
||||
neededBlobs, err = nbc.MissingBlobsForKnownStream(blobHash)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -249,9 +251,9 @@ func (s *Server) receiveBlob(conn net.Conn) error {
|
|||
log.Debugln("Got blob " + blobHash[:8])
|
||||
|
||||
if isSdBlob {
|
||||
err = s.store.PutSD(blobHash, blob)
|
||||
err = s.outerStore.PutSD(blobHash, blob)
|
||||
} else {
|
||||
err = s.store.Put(blobHash, blob)
|
||||
err = s.outerStore.Put(blobHash, blob)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
Loading…
Reference in a new issue