use rocksdb instead of leveldb

-dont use block processor directly from session manager
This commit is contained in:
Jack Robison 2021-11-08 14:18:22 -05:00
parent dd5b9ca81b
commit 0a71e2ff91
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
6 changed files with 151 additions and 71 deletions

View file

@ -15,7 +15,6 @@ RUN apt-get update && \
build-essential \
automake libtool \
pkg-config \
libleveldb-dev \
python3.7 \
python3-dev \
python3-pip \

View file

@ -1,32 +1,130 @@
import struct
import rocksdb
from typing import Optional
from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
class KeyValueStorage:
class RocksDBStore:
def __init__(self, path: str, cache_mb: int, max_open_files: int, secondary_path: str = ''):
# Use snappy compression (the default)
self.path = path
self._max_open_files = max_open_files
self.db = rocksdb.DB(path, self.get_options(), secondary_name=secondary_path)
# self.multi_get = self.db.multi_get
def get_options(self):
return rocksdb.Options(
create_if_missing=True, use_fsync=True, target_file_size_base=33554432,
max_open_files=self._max_open_files
)
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
raise NotImplemented()
return self.db.get(key, fill_cache=fill_cache)
def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None,
include_key=True, include_value=True, fill_cache=True):
raise NotImplemented()
return RocksDBIterator(
self.db, reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop,
prefix=prefix, include_key=include_key, include_value=include_value
)
def write_batch(self, transaction: bool = False):
raise NotImplemented()
def write_batch(self, disable_wal: bool = False, sync: bool = False):
return RocksDBWriteBatch(self.db, sync=sync, disable_wal=disable_wal)
def close(self):
raise NotImplemented()
self.db.close()
self.db = None
@property
def closed(self) -> bool:
raise NotImplemented()
return self.db is None
def try_catch_up_with_primary(self):
self.db.try_catch_up_with_primary()
class RocksDBWriteBatch:
def __init__(self, db: rocksdb.DB, sync: bool = False, disable_wal: bool = False):
self.batch = rocksdb.WriteBatch()
self.db = db
self.sync = sync
self.disable_wal = disable_wal
def __enter__(self):
return self.batch
def __exit__(self, exc_type, exc_val, exc_tb):
if not exc_val:
self.db.write(self.batch, sync=self.sync, disable_wal=self.disable_wal)
class RocksDBIterator:
"""An iterator for RocksDB."""
__slots__ = [
'start',
'prefix',
'stop',
'iterator',
'include_key',
'include_value',
'prev_k',
'reverse',
'include_start',
'include_stop'
]
def __init__(self, db: rocksdb.DB, prefix: bytes = None, start: bool = None, stop: bytes = None,
include_key: bool = True, include_value: bool = True, reverse: bool = False,
include_start: bool = True, include_stop: bool = False):
assert (start is None and stop is None) or (prefix is None), 'cannot use start/stop and prefix'
self.start = start
self.prefix = prefix
self.stop = stop
self.iterator = db.iteritems() if not reverse else reversed(db.iteritems())
if prefix is not None:
self.iterator.seek(prefix)
elif start is not None:
self.iterator.seek(start)
self.include_key = include_key
self.include_value = include_value
self.prev_k = None
self.reverse = reverse
self.include_start = include_start
self.include_stop = include_stop
def __iter__(self):
return self
def _check_stop_iteration(self, key: bytes):
if self.stop is not None and (key.startswith(self.stop) or self.stop < key[:len(self.stop)]):
raise StopIteration
elif self.start is not None and self.start > key[:len(self.start)]:
raise StopIteration
elif self.prefix is not None and not key.startswith(self.prefix):
raise StopIteration
def __next__(self):
# TODO: include start/stop on/off
# check for needing to stop from previous iteration
if self.prev_k is not None:
self._check_stop_iteration(self.prev_k)
k, v = next(self.iterator)
self._check_stop_iteration(k)
self.prev_k = k
if self.include_key and self.include_value:
return k, v
elif self.include_key:
return k
return v
class PrefixDB:
UNDO_KEY_STRUCT = struct.Struct(b'>Q')
def __init__(self, db: KeyValueStorage, max_undo_depth: int = 200, unsafe_prefixes=None):
def __init__(self, db: RocksDBStore, max_undo_depth: int = 200, unsafe_prefixes=None):
self._db = db
self._op_stack = RevertableOpStack(db.get, unsafe_prefixes=unsafe_prefixes)
self._max_undo_depth = max_undo_depth
@ -37,7 +135,7 @@ class PrefixDB:
Changes written cannot be undone
"""
try:
with self._db.write_batch(transaction=True) as batch:
with self._db.write_batch(sync=True) as batch:
batch_put = batch.put
batch_delete = batch.delete
for staged_change in self._op_stack:
@ -61,7 +159,7 @@ class PrefixDB:
include_value=False
))
try:
with self._db.write_batch(transaction=True) as batch:
with self._db.write_batch(sync=True) as batch:
batch_put = batch.put
batch_delete = batch.delete
for staged_change in self._op_stack:
@ -82,7 +180,7 @@ class PrefixDB:
undo_key = DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height)
self._op_stack.apply_packed_undo_ops(self._db.get(undo_key))
try:
with self._db.write_batch(transaction=True) as batch:
with self._db.write_batch(sync=True) as batch:
batch_put = batch.put
batch_delete = batch.delete
for staged_change in self._op_stack:
@ -108,6 +206,9 @@ class PrefixDB:
if not self._db.closed:
self._db.close()
def try_catch_up_with_primary(self):
self._db.try_catch_up_with_primary()
@property
def closed(self):
return self._db.closed

View file

@ -4,7 +4,7 @@ import array
import base64
from typing import Union, Tuple, NamedTuple, Optional
from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.db import KeyValueStorage, PrefixDB
from lbry.wallet.server.db.db import RocksDBStore, PrefixDB
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
from lbry.schema.url import normalize_name
@ -38,7 +38,7 @@ class PrefixRow(metaclass=PrefixRowType):
value_struct: struct.Struct
key_part_lambdas = []
def __init__(self, db: KeyValueStorage, op_stack: RevertableOpStack):
def __init__(self, db: RocksDBStore, op_stack: RevertableOpStack):
self._db = db
self._op_stack = op_stack
@ -1595,40 +1595,10 @@ class BlockTxsPrefixRow(PrefixRow):
return cls.pack_key(height), cls.pack_value(tx_hashes)
class LevelDBStore(KeyValueStorage):
def __init__(self, path: str, cache_mb: int, max_open_files: int):
import plyvel
self.db = plyvel.DB(
path, create_if_missing=True, max_open_files=max_open_files,
lru_cache_size=cache_mb * 1024 * 1024, write_buffer_size=64 * 1024 * 1024,
max_file_size=1024 * 1024 * 64, bloom_filter_bits=32
)
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
return self.db.get(key, fill_cache=fill_cache)
def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None,
include_key=True, include_value=True, fill_cache=True):
return self.db.iterator(
reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop,
prefix=prefix, include_key=include_key, include_value=include_value, fill_cache=fill_cache
)
def write_batch(self, transaction: bool = False, sync: bool = False):
return self.db.write_batch(transaction=transaction, sync=sync)
def close(self):
return self.db.close()
@property
def closed(self) -> bool:
return self.db.closed
class HubDB(PrefixDB):
def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 512,
unsafe_prefixes: Optional[typing.Set[bytes]] = None):
db = LevelDBStore(path, cache_mb, max_open_files)
secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None):
db = RocksDBStore(path, cache_mb, max_open_files, secondary_path=secondary_path)
super().__init__(db, reorg_limit, unsafe_prefixes=unsafe_prefixes)
self.claim_to_support = ClaimToSupportPrefixRow(db, self._op_stack)
self.support_to_claim = SupportToClaimPrefixRow(db, self._op_stack)

View file

@ -26,7 +26,10 @@ class Server:
self.prometheus_server: typing.Optional[PrometheusServer] = None
self.session_mgr = LBRYSessionManager(
env, db, bp, daemon, self.shutdown_event
env, db, bp.mempool, bp.history_cache, bp.resolve_cache, bp.resolve_outputs_cache, daemon,
self.shutdown_event,
on_available_callback=bp.status_server.set_available,
on_unavailable_callback=bp.status_server.set_unavailable
)
self._indexer_task = None

View file

@ -170,13 +170,16 @@ class SessionManager:
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
)
def __init__(self, env: 'Env', db: LevelDB, bp: BlockProcessor, daemon: 'Daemon', shutdown_event: asyncio.Event):
def __init__(self, env: 'Env', db: LevelDB, mempool, history_cache, resolve_cache, resolve_outputs_cache,
daemon: 'Daemon', shutdown_event: asyncio.Event,
on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]):
env.max_send = max(350000, env.max_send)
self.env = env
self.db = db
self.bp = bp
self.on_available_callback = on_available_callback
self.on_unavailable_callback = on_unavailable_callback
self.daemon = daemon
self.mempool = bp.mempool
self.mempool = mempool
self.shutdown_event = shutdown_event
self.logger = util.class_logger(__name__, self.__class__.__name__)
self.servers: typing.Dict[str, asyncio.AbstractServer] = {}
@ -186,7 +189,9 @@ class SessionManager:
self.cur_group = SessionGroup(0)
self.txs_sent = 0
self.start_time = time.time()
self.history_cache = self.bp.history_cache
self.history_cache = history_cache
self.resolve_cache = resolve_cache
self.resolve_outputs_cache = resolve_outputs_cache
self.notified_height: typing.Optional[int] = None
# Cache some idea of room to avoid recounting on each subscription
self.subs_room = 0
@ -243,7 +248,7 @@ class SessionManager:
await self.session_event.wait()
self.session_event.clear()
if not paused and len(self.sessions) >= max_sessions:
self.bp.status_server.set_unavailable()
self.on_unavailable_callback()
self.logger.info(f'maximum sessions {max_sessions:,d} '
f'reached, stopping new connections until '
f'count drops to {low_watermark:,d}')
@ -252,7 +257,7 @@ class SessionManager:
# Start listening for incoming connections if paused and
# session count has fallen
if paused and len(self.sessions) <= low_watermark:
self.bp.status_server.set_available()
self.on_available_callback()
self.logger.info('resuming listening for incoming connections')
await self._start_external_servers()
paused = False
@ -533,7 +538,7 @@ class SessionManager:
await self.start_other()
await self._start_external_servers()
server_listening_event.set()
self.bp.status_server.set_available()
self.on_available_callback()
# Peer discovery should start after the external servers
# because we connect to ourself
await asyncio.wait([
@ -628,8 +633,9 @@ class SessionManager:
for hashX in touched.intersection(self.mempool_statuses.keys()):
self.mempool_statuses.pop(hashX, None)
# self.bp._chain_executor
await asyncio.get_event_loop().run_in_executor(
self.bp._chain_executor, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
None, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
)
if touched or new_touched or (height_changed and self.mempool_statuses):
@ -866,8 +872,7 @@ class LBRYElectrumX(SessionBase):
self.protocol_tuple = self.PROTOCOL_MIN
self.protocol_string = None
self.daemon = self.session_mgr.daemon
self.bp: BlockProcessor = self.session_mgr.bp
self.db: LevelDB = self.bp.db
self.db: LevelDB = self.session_mgr.db
@classmethod
def protocol_min_max_strings(cls):
@ -1008,21 +1013,21 @@ class LBRYElectrumX(SessionBase):
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
async def _cached_resolve_url(self, url):
if url not in self.bp.resolve_cache:
self.bp.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
return self.bp.resolve_cache[url]
if url not in self.session_mgr.resolve_cache:
self.session_mgr.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
return self.session_mgr.resolve_cache[url]
async def claimtrie_resolve(self, *urls) -> str:
sorted_urls = tuple(sorted(urls))
self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls))
try:
if sorted_urls in self.bp.resolve_outputs_cache:
return self.bp.resolve_outputs_cache[sorted_urls]
if sorted_urls in self.session_mgr.resolve_outputs_cache:
return self.session_mgr.resolve_outputs_cache[sorted_urls]
rows, extra = [], []
for url in urls:
if url not in self.bp.resolve_cache:
self.bp.resolve_cache[url] = await self._cached_resolve_url(url)
stream, channel, repost, reposted_channel = self.bp.resolve_cache[url]
if url not in self.session_mgr.resolve_cache:
self.session_mgr.resolve_cache[url] = await self._cached_resolve_url(url)
stream, channel, repost, reposted_channel = self.session_mgr.resolve_cache[url]
if isinstance(channel, ResolveCensoredError):
rows.append(channel)
extra.append(channel.censor_row)
@ -1047,7 +1052,7 @@ class LBRYElectrumX(SessionBase):
if reposted_channel:
extra.append(reposted_channel)
await asyncio.sleep(0)
self.bp.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor(
self.session_mgr.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor(
None, Outputs.to_base64, rows, extra, 0, None, None
)
return result
@ -1055,7 +1060,7 @@ class LBRYElectrumX(SessionBase):
self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls))
async def get_server_height(self):
return self.bp.height
return self.db.db_height
async def transaction_get_height(self, tx_hash):
self.assert_tx_hash(tx_hash)

View file

@ -7,9 +7,11 @@ BASE = os.path.dirname(__file__)
with open(os.path.join(BASE, 'README.md'), encoding='utf-8') as fh:
long_description = fh.read()
PLYVEL = []
if sys.platform.startswith('linux'):
PLYVEL.append('plyvel==1.3.0')
ROCKSDB = []
if sys.platform.startswith('linux') or sys.platform.startswith('darwin'):
ROCKSDB.append('lbry-rocksdb==0.8.1')
setup(
name=__name__,
@ -58,7 +60,7 @@ setup(
'elasticsearch==7.10.1',
'grpcio==1.38.0',
'filetype==1.0.9'
] + PLYVEL,
] + ROCKSDB,
extras_require={
'torrent': ['lbry-libtorrent'],
'lint': ['pylint==2.10.0'],