when reflecting a sdblob, insert all the stream and intermediate blobs using a transaction #50

Closed
shyba wants to merge 39 commits from insert_under_tx into master
Showing only changes of commit a574fecf4e - Show all commits

View file

@ -34,6 +34,7 @@ var (
useDB bool useDB bool
cloudFrontEndpoint string cloudFrontEndpoint string
reflectorCmdDiskCache string reflectorCmdDiskCache string
bufferReflectorCmdDiskCache string
reflectorCmdMemCache int reflectorCmdMemCache int
) )
@ -56,6 +57,8 @@ func init() {
cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not") cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not")
cmd.Flags().StringVar(&reflectorCmdDiskCache, "disk-cache", "", cmd.Flags().StringVar(&reflectorCmdDiskCache, "disk-cache", "",
"enable disk cache, setting max size and path where to store blobs. format is 'MAX_BLOBS:CACHE_PATH'") "enable disk cache, setting max size and path where to store blobs. format is 'MAX_BLOBS:CACHE_PATH'")
cmd.Flags().StringVar(&bufferReflectorCmdDiskCache, "buffer-disk-cache", "",
"enable buffer disk cache, setting max size and path where to store blobs. format is 'MAX_BLOBS:CACHE_PATH'")
cmd.Flags().IntVar(&reflectorCmdMemCache, "mem-cache", 0, "enable in-memory cache with a max size of this many blobs") cmd.Flags().IntVar(&reflectorCmdMemCache, "mem-cache", 0, "enable in-memory cache with a max size of this many blobs")
rootCmd.AddCommand(cmd) rootCmd.AddCommand(cmd)
} }
@ -147,7 +150,19 @@ func setupStore() store.BlobStore {
func wrapWithCache(s store.BlobStore) store.BlobStore { func wrapWithCache(s store.BlobStore) store.BlobStore {
wrapped := s wrapped := s
diskCacheMaxSize, diskCachePath := diskCacheParams() diskCacheMaxSize, diskCachePath := diskCacheParams(reflectorCmdDiskCache)
if diskCacheMaxSize > 0 {
err := os.MkdirAll(diskCachePath, os.ModePerm)
if err != nil {
log.Fatal(err)
}
wrapped = store.NewCachingStore(
"reflector",
wrapped,
store.NewLRUStore("peer_server", store.NewDiskStore(diskCachePath, 2), diskCacheMaxSize),
)
}
diskCacheMaxSize, diskCachePath = diskCacheParams(bufferReflectorCmdDiskCache)
if diskCacheMaxSize > 0 { if diskCacheMaxSize > 0 {
err := os.MkdirAll(diskCachePath, os.ModePerm) err := os.MkdirAll(diskCachePath, os.ModePerm)
if err != nil { if err != nil {
@ -159,7 +174,6 @@ func wrapWithCache(s store.BlobStore) store.BlobStore {
store.NewLRUStore("peer_server", store.NewDiskStore(diskCachePath, 2), diskCacheMaxSize), store.NewLRUStore("peer_server", store.NewDiskStore(diskCachePath, 2), diskCacheMaxSize),
) )
} }
if reflectorCmdMemCache > 0 { if reflectorCmdMemCache > 0 {
wrapped = store.NewCachingStore( wrapped = store.NewCachingStore(
"reflector", "reflector",
@ -171,12 +185,12 @@ func wrapWithCache(s store.BlobStore) store.BlobStore {
return wrapped return wrapped
} }
func diskCacheParams() (int, string) { func diskCacheParams(diskParams string) (int, string) {
if reflectorCmdDiskCache == "" { if diskParams == "" {
return 0, "" return 0, ""
} }
parts := strings.Split(reflectorCmdDiskCache, ":") parts := strings.Split(diskParams, ":")
if len(parts) != 2 { if len(parts) != 2 {
log.Fatalf("--disk-cache must be a number, followed by ':', followed by a string") log.Fatalf("--disk-cache must be a number, followed by ':', followed by a string")
} }