write blobs to tmp dir to avoid corruption

This commit is contained in:
Niko Storni 2021-06-07 23:03:10 +02:00
parent 63aacd8a69
commit 2e101083e6

View file

@ -56,7 +56,7 @@ type DiskStore struct {
concurrentChecks atomic.Int32 concurrentChecks atomic.Int32
} }
const maxConcurrentChecks = 3 const maxConcurrentChecks = 30
// NewDiskStore returns an initialized file disk store pointer. // NewDiskStore returns an initialized file disk store pointer.
func NewDiskStore(dir string, prefixLength int) *DiskStore { func NewDiskStore(dir string, prefixLength int) *DiskStore {
@ -136,9 +136,10 @@ func (d *DiskStore) Put(hash string, blob stream.Blob) error {
if err != nil { if err != nil {
return err return err
} }
// Open file with O_DIRECT // Open file with O_DIRECT
flags := os.O_WRONLY | os.O_CREATE | syscall.O_DIRECT flags := os.O_WRONLY | os.O_CREATE | syscall.O_DIRECT
f, err := os.OpenFile(d.path(hash), flags, 0644) f, err := os.OpenFile(d.tmpPath(hash), flags, 0644)
if err != nil { if err != nil {
return errors.Err(err) return errors.Err(err)
} }
@ -152,6 +153,10 @@ func (d *DiskStore) Put(hash string, blob stream.Blob) error {
defer dio.Flush() defer dio.Flush()
// Write the body to file // Write the body to file
_, err = io.Copy(dio, bytes.NewReader(blob)) _, err = io.Copy(dio, bytes.NewReader(blob))
if err != nil {
return errors.Err(err)
}
err = os.Rename(d.tmpPath(hash), d.path(hash))
return errors.Err(err) return errors.Err(err)
} }
@ -195,11 +200,15 @@ func (d *DiskStore) dir(hash string) string {
} }
return path.Join(d.blobDir, hash[:d.prefixLength]) return path.Join(d.blobDir, hash[:d.prefixLength])
} }
func (d *DiskStore) tmpDir(hash string) string {
return path.Join(d.blobDir, "tmp")
}
func (d *DiskStore) path(hash string) string { func (d *DiskStore) path(hash string) string {
return path.Join(d.dir(hash), hash) return path.Join(d.dir(hash), hash)
} }
func (d *DiskStore) tmpPath(hash string) string {
return path.Join(d.tmpDir(hash), hash)
}
func (d *DiskStore) ensureDirExists(dir string) error { func (d *DiskStore) ensureDirExists(dir string) error {
return errors.Err(os.MkdirAll(dir, 0755)) return errors.Err(os.MkdirAll(dir, 0755))
} }
@ -213,7 +222,10 @@ func (d *DiskStore) initOnce() error {
if err != nil { if err != nil {
return err return err
} }
err = d.ensureDirExists(path.Join(d.blobDir, "tmp"))
if err != nil {
return err
}
d.initialized = true d.initialized = true
return nil return nil
} }