From 976387fefb105586b454bcd91f19468bc0ae0c37 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sun, 29 Nov 2020 13:38:35 -0500 Subject: [PATCH] RocksDBIterator --- lbry/wallet/server/storage.py | 69 ++++++++++++++++++++++------------- 1 file changed, 43 insertions(+), 26 deletions(-) diff --git a/lbry/wallet/server/storage.py b/lbry/wallet/server/storage.py index 127166204..ca24daa99 100644 --- a/lbry/wallet/server/storage.py +++ b/lbry/wallet/server/storage.py @@ -25,18 +25,18 @@ def db_class(db_dir, name): class Storage: """Abstract base class of the DB backend abstraction.""" - def __init__(self, db_dir, name, for_sync): + def __init__(self, db_dir, name, for_sync, read_only=False): self.db_dir = db_dir self.is_new = not os.path.exists(os.path.join(db_dir, name)) self.for_sync = for_sync or self.is_new - self.open(name, create=self.is_new) + self.open(name, create=self.is_new, read_only=read_only) @classmethod def import_module(cls): """Import the DB engine module.""" raise NotImplementedError - def open(self, name, create): + def open(self, name, create, read_only=False): """Open an existing database or create a new one.""" raise NotImplementedError @@ -77,7 +77,7 @@ class LevelDB(Storage): import plyvel cls.module = plyvel - def open(self, name, create, lru_cache_size=None): + def open(self, name, create, read_only=False): mof = 10000 path = os.path.join(self.db_dir, name) # Use snappy compression (the default) @@ -97,17 +97,18 @@ class RocksDB(Storage): import rocksdb cls.module = rocksdb - def open(self, name, create): - mof = 512 if self.for_sync else 128 + def open(self, name, create, read_only=False): + mof = 10000 path = os.path.join(self.db_dir, name) # Use snappy compression (the default) options = self.module.Options(create_if_missing=create, use_fsync=True, target_file_size_base=33554432, max_open_files=mof) - self.db = self.module.DB(path, options) + self.db = self.module.DB(path, options, read_only=read_only) self.get = self.db.get self.put = self.db.put + self.multi_get = self.db.multi_get def close(self): # PyRocksDB doesn't provide a close method; hopefully this is enough @@ -118,8 +119,8 @@ class RocksDB(Storage): def write_batch(self): return RocksDBWriteBatch(self.db) - def iterator(self, prefix=b'', reverse=False): - return RocksDBIterator(self.db, prefix, reverse) + def iterator(self, **kwargs): + return RocksDBIterator(self.db, **kwargs) class RocksDBWriteBatch: @@ -140,28 +141,44 @@ class RocksDBWriteBatch: class RocksDBIterator: """An iterator for RocksDB.""" - def __init__(self, db, prefix, reverse): + __slots__ = [ + 'start', + 'prefix', + 'stop', + 'iterator', + 'include_key', + 'include_value', + 'prev', + 'reverse' + ] + + def __init__(self, db, prefix=None, start=None, stop=None, include_key=True, include_value=True, reverse=False): + assert (start is None and stop is None) or (prefix is None), 'cannot use start/stop and prefix' + self.start = start self.prefix = prefix - if reverse: - self.iterator = reversed(db.iteritems()) - nxt_prefix = util.increment_byte_string(prefix) - if nxt_prefix: - self.iterator.seek(nxt_prefix) - try: - next(self.iterator) - except StopIteration: - self.iterator.seek(nxt_prefix) - else: - self.iterator.seek_to_last() - else: - self.iterator = db.iteritems() + self.stop = stop + self.iterator = db.iteritems() if not reverse else reversed(db.iteritems()) + if prefix is not None: self.iterator.seek(prefix) + elif start is not None: + self.iterator.seek(start) + self.include_key = include_key + self.include_value = include_value + self.prev = None + self.reverse = reverse def __iter__(self): return self def __next__(self): - k, v = next(self.iterator) - if not k.startswith(self.prefix): + if None not in (self.stop, self.prev) and self.prev.startswith(self.stop): raise StopIteration - return k, v + self.prev, v = next(self.iterator) + prev = self.prev + if self.prefix is not None and not prev.startswith(self.prefix): + raise StopIteration + if self.include_key and self.include_value: + return prev, v + elif self.include_key: + return prev + return v