use rocksdb instead of leveldb
-dont use block processor directly from session manager
This commit is contained in:
parent
f78e3825ca
commit
1992b83faf
6 changed files with 151 additions and 71 deletions
|
@ -15,7 +15,6 @@ RUN apt-get update && \
|
|||
build-essential \
|
||||
automake libtool \
|
||||
pkg-config \
|
||||
libleveldb-dev \
|
||||
python3.7 \
|
||||
python3-dev \
|
||||
python3-pip \
|
||||
|
|
|
@ -1,32 +1,130 @@
|
|||
import struct
|
||||
import rocksdb
|
||||
from typing import Optional
|
||||
from lbry.wallet.server.db import DB_PREFIXES
|
||||
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
|
||||
|
||||
|
||||
class KeyValueStorage:
|
||||
class RocksDBStore:
|
||||
def __init__(self, path: str, cache_mb: int, max_open_files: int, secondary_path: str = ''):
|
||||
# Use snappy compression (the default)
|
||||
self.path = path
|
||||
self._max_open_files = max_open_files
|
||||
self.db = rocksdb.DB(path, self.get_options(), secondary_name=secondary_path)
|
||||
# self.multi_get = self.db.multi_get
|
||||
|
||||
def get_options(self):
|
||||
return rocksdb.Options(
|
||||
create_if_missing=True, use_fsync=True, target_file_size_base=33554432,
|
||||
max_open_files=self._max_open_files
|
||||
)
|
||||
|
||||
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
|
||||
raise NotImplemented()
|
||||
return self.db.get(key, fill_cache=fill_cache)
|
||||
|
||||
def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None,
|
||||
include_key=True, include_value=True, fill_cache=True):
|
||||
raise NotImplemented()
|
||||
return RocksDBIterator(
|
||||
self.db, reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop,
|
||||
prefix=prefix, include_key=include_key, include_value=include_value
|
||||
)
|
||||
|
||||
def write_batch(self, transaction: bool = False):
|
||||
raise NotImplemented()
|
||||
def write_batch(self, disable_wal: bool = False, sync: bool = False):
|
||||
return RocksDBWriteBatch(self.db, sync=sync, disable_wal=disable_wal)
|
||||
|
||||
def close(self):
|
||||
raise NotImplemented()
|
||||
self.db.close()
|
||||
self.db = None
|
||||
|
||||
@property
|
||||
def closed(self) -> bool:
|
||||
raise NotImplemented()
|
||||
return self.db is None
|
||||
|
||||
def try_catch_up_with_primary(self):
|
||||
self.db.try_catch_up_with_primary()
|
||||
|
||||
|
||||
class RocksDBWriteBatch:
|
||||
def __init__(self, db: rocksdb.DB, sync: bool = False, disable_wal: bool = False):
|
||||
self.batch = rocksdb.WriteBatch()
|
||||
self.db = db
|
||||
self.sync = sync
|
||||
self.disable_wal = disable_wal
|
||||
|
||||
def __enter__(self):
|
||||
return self.batch
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
if not exc_val:
|
||||
self.db.write(self.batch, sync=self.sync, disable_wal=self.disable_wal)
|
||||
|
||||
|
||||
class RocksDBIterator:
|
||||
"""An iterator for RocksDB."""
|
||||
|
||||
__slots__ = [
|
||||
'start',
|
||||
'prefix',
|
||||
'stop',
|
||||
'iterator',
|
||||
'include_key',
|
||||
'include_value',
|
||||
'prev_k',
|
||||
'reverse',
|
||||
'include_start',
|
||||
'include_stop'
|
||||
]
|
||||
|
||||
def __init__(self, db: rocksdb.DB, prefix: bytes = None, start: bool = None, stop: bytes = None,
|
||||
include_key: bool = True, include_value: bool = True, reverse: bool = False,
|
||||
include_start: bool = True, include_stop: bool = False):
|
||||
assert (start is None and stop is None) or (prefix is None), 'cannot use start/stop and prefix'
|
||||
self.start = start
|
||||
self.prefix = prefix
|
||||
self.stop = stop
|
||||
self.iterator = db.iteritems() if not reverse else reversed(db.iteritems())
|
||||
if prefix is not None:
|
||||
self.iterator.seek(prefix)
|
||||
elif start is not None:
|
||||
self.iterator.seek(start)
|
||||
self.include_key = include_key
|
||||
self.include_value = include_value
|
||||
self.prev_k = None
|
||||
self.reverse = reverse
|
||||
self.include_start = include_start
|
||||
self.include_stop = include_stop
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def _check_stop_iteration(self, key: bytes):
|
||||
if self.stop is not None and (key.startswith(self.stop) or self.stop < key[:len(self.stop)]):
|
||||
raise StopIteration
|
||||
elif self.start is not None and self.start > key[:len(self.start)]:
|
||||
raise StopIteration
|
||||
elif self.prefix is not None and not key.startswith(self.prefix):
|
||||
raise StopIteration
|
||||
|
||||
def __next__(self):
|
||||
# TODO: include start/stop on/off
|
||||
# check for needing to stop from previous iteration
|
||||
if self.prev_k is not None:
|
||||
self._check_stop_iteration(self.prev_k)
|
||||
k, v = next(self.iterator)
|
||||
self._check_stop_iteration(k)
|
||||
self.prev_k = k
|
||||
|
||||
if self.include_key and self.include_value:
|
||||
return k, v
|
||||
elif self.include_key:
|
||||
return k
|
||||
return v
|
||||
|
||||
|
||||
class PrefixDB:
|
||||
UNDO_KEY_STRUCT = struct.Struct(b'>Q')
|
||||
|
||||
def __init__(self, db: KeyValueStorage, max_undo_depth: int = 200, unsafe_prefixes=None):
|
||||
def __init__(self, db: RocksDBStore, max_undo_depth: int = 200, unsafe_prefixes=None):
|
||||
self._db = db
|
||||
self._op_stack = RevertableOpStack(db.get, unsafe_prefixes=unsafe_prefixes)
|
||||
self._max_undo_depth = max_undo_depth
|
||||
|
@ -37,7 +135,7 @@ class PrefixDB:
|
|||
Changes written cannot be undone
|
||||
"""
|
||||
try:
|
||||
with self._db.write_batch(transaction=True) as batch:
|
||||
with self._db.write_batch(sync=True) as batch:
|
||||
batch_put = batch.put
|
||||
batch_delete = batch.delete
|
||||
for staged_change in self._op_stack:
|
||||
|
@ -61,7 +159,7 @@ class PrefixDB:
|
|||
include_value=False
|
||||
))
|
||||
try:
|
||||
with self._db.write_batch(transaction=True) as batch:
|
||||
with self._db.write_batch(sync=True) as batch:
|
||||
batch_put = batch.put
|
||||
batch_delete = batch.delete
|
||||
for staged_change in self._op_stack:
|
||||
|
@ -82,7 +180,7 @@ class PrefixDB:
|
|||
undo_key = DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height)
|
||||
self._op_stack.apply_packed_undo_ops(self._db.get(undo_key))
|
||||
try:
|
||||
with self._db.write_batch(transaction=True) as batch:
|
||||
with self._db.write_batch(sync=True) as batch:
|
||||
batch_put = batch.put
|
||||
batch_delete = batch.delete
|
||||
for staged_change in self._op_stack:
|
||||
|
@ -108,6 +206,9 @@ class PrefixDB:
|
|||
if not self._db.closed:
|
||||
self._db.close()
|
||||
|
||||
def try_catch_up_with_primary(self):
|
||||
self._db.try_catch_up_with_primary()
|
||||
|
||||
@property
|
||||
def closed(self):
|
||||
return self._db.closed
|
||||
|
|
|
@ -4,7 +4,7 @@ import array
|
|||
import base64
|
||||
from typing import Union, Tuple, NamedTuple, Optional
|
||||
from lbry.wallet.server.db import DB_PREFIXES
|
||||
from lbry.wallet.server.db.db import KeyValueStorage, PrefixDB
|
||||
from lbry.wallet.server.db.db import RocksDBStore, PrefixDB
|
||||
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
|
||||
from lbry.schema.url import normalize_name
|
||||
|
||||
|
@ -38,7 +38,7 @@ class PrefixRow(metaclass=PrefixRowType):
|
|||
value_struct: struct.Struct
|
||||
key_part_lambdas = []
|
||||
|
||||
def __init__(self, db: KeyValueStorage, op_stack: RevertableOpStack):
|
||||
def __init__(self, db: RocksDBStore, op_stack: RevertableOpStack):
|
||||
self._db = db
|
||||
self._op_stack = op_stack
|
||||
|
||||
|
@ -1595,40 +1595,10 @@ class BlockTxsPrefixRow(PrefixRow):
|
|||
return cls.pack_key(height), cls.pack_value(tx_hashes)
|
||||
|
||||
|
||||
class LevelDBStore(KeyValueStorage):
|
||||
def __init__(self, path: str, cache_mb: int, max_open_files: int):
|
||||
import plyvel
|
||||
self.db = plyvel.DB(
|
||||
path, create_if_missing=True, max_open_files=max_open_files,
|
||||
lru_cache_size=cache_mb * 1024 * 1024, write_buffer_size=64 * 1024 * 1024,
|
||||
max_file_size=1024 * 1024 * 64, bloom_filter_bits=32
|
||||
)
|
||||
|
||||
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
|
||||
return self.db.get(key, fill_cache=fill_cache)
|
||||
|
||||
def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None,
|
||||
include_key=True, include_value=True, fill_cache=True):
|
||||
return self.db.iterator(
|
||||
reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop,
|
||||
prefix=prefix, include_key=include_key, include_value=include_value, fill_cache=fill_cache
|
||||
)
|
||||
|
||||
def write_batch(self, transaction: bool = False, sync: bool = False):
|
||||
return self.db.write_batch(transaction=transaction, sync=sync)
|
||||
|
||||
def close(self):
|
||||
return self.db.close()
|
||||
|
||||
@property
|
||||
def closed(self) -> bool:
|
||||
return self.db.closed
|
||||
|
||||
|
||||
class HubDB(PrefixDB):
|
||||
def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 512,
|
||||
unsafe_prefixes: Optional[typing.Set[bytes]] = None):
|
||||
db = LevelDBStore(path, cache_mb, max_open_files)
|
||||
secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None):
|
||||
db = RocksDBStore(path, cache_mb, max_open_files, secondary_path=secondary_path)
|
||||
super().__init__(db, reorg_limit, unsafe_prefixes=unsafe_prefixes)
|
||||
self.claim_to_support = ClaimToSupportPrefixRow(db, self._op_stack)
|
||||
self.support_to_claim = SupportToClaimPrefixRow(db, self._op_stack)
|
||||
|
|
|
@ -26,7 +26,10 @@ class Server:
|
|||
self.prometheus_server: typing.Optional[PrometheusServer] = None
|
||||
|
||||
self.session_mgr = LBRYSessionManager(
|
||||
env, db, bp, daemon, self.shutdown_event
|
||||
env, db, bp.mempool, bp.history_cache, bp.resolve_cache, bp.resolve_outputs_cache, daemon,
|
||||
self.shutdown_event,
|
||||
on_available_callback=bp.status_server.set_available,
|
||||
on_unavailable_callback=bp.status_server.set_unavailable
|
||||
)
|
||||
self._indexer_task = None
|
||||
|
||||
|
|
|
@ -170,13 +170,16 @@ class SessionManager:
|
|||
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
|
||||
)
|
||||
|
||||
def __init__(self, env: 'Env', db: LevelDB, bp: BlockProcessor, daemon: 'Daemon', shutdown_event: asyncio.Event):
|
||||
def __init__(self, env: 'Env', db: LevelDB, mempool, history_cache, resolve_cache, resolve_outputs_cache,
|
||||
daemon: 'Daemon', shutdown_event: asyncio.Event,
|
||||
on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]):
|
||||
env.max_send = max(350000, env.max_send)
|
||||
self.env = env
|
||||
self.db = db
|
||||
self.bp = bp
|
||||
self.on_available_callback = on_available_callback
|
||||
self.on_unavailable_callback = on_unavailable_callback
|
||||
self.daemon = daemon
|
||||
self.mempool = bp.mempool
|
||||
self.mempool = mempool
|
||||
self.shutdown_event = shutdown_event
|
||||
self.logger = util.class_logger(__name__, self.__class__.__name__)
|
||||
self.servers: typing.Dict[str, asyncio.AbstractServer] = {}
|
||||
|
@ -186,7 +189,9 @@ class SessionManager:
|
|||
self.cur_group = SessionGroup(0)
|
||||
self.txs_sent = 0
|
||||
self.start_time = time.time()
|
||||
self.history_cache = self.bp.history_cache
|
||||
self.history_cache = history_cache
|
||||
self.resolve_cache = resolve_cache
|
||||
self.resolve_outputs_cache = resolve_outputs_cache
|
||||
self.notified_height: typing.Optional[int] = None
|
||||
# Cache some idea of room to avoid recounting on each subscription
|
||||
self.subs_room = 0
|
||||
|
@ -243,7 +248,7 @@ class SessionManager:
|
|||
await self.session_event.wait()
|
||||
self.session_event.clear()
|
||||
if not paused and len(self.sessions) >= max_sessions:
|
||||
self.bp.status_server.set_unavailable()
|
||||
self.on_unavailable_callback()
|
||||
self.logger.info(f'maximum sessions {max_sessions:,d} '
|
||||
f'reached, stopping new connections until '
|
||||
f'count drops to {low_watermark:,d}')
|
||||
|
@ -252,7 +257,7 @@ class SessionManager:
|
|||
# Start listening for incoming connections if paused and
|
||||
# session count has fallen
|
||||
if paused and len(self.sessions) <= low_watermark:
|
||||
self.bp.status_server.set_available()
|
||||
self.on_available_callback()
|
||||
self.logger.info('resuming listening for incoming connections')
|
||||
await self._start_external_servers()
|
||||
paused = False
|
||||
|
@ -533,7 +538,7 @@ class SessionManager:
|
|||
await self.start_other()
|
||||
await self._start_external_servers()
|
||||
server_listening_event.set()
|
||||
self.bp.status_server.set_available()
|
||||
self.on_available_callback()
|
||||
# Peer discovery should start after the external servers
|
||||
# because we connect to ourself
|
||||
await asyncio.wait([
|
||||
|
@ -628,8 +633,9 @@ class SessionManager:
|
|||
for hashX in touched.intersection(self.mempool_statuses.keys()):
|
||||
self.mempool_statuses.pop(hashX, None)
|
||||
|
||||
# self.bp._chain_executor
|
||||
await asyncio.get_event_loop().run_in_executor(
|
||||
self.bp._chain_executor, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
|
||||
None, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
|
||||
)
|
||||
|
||||
if touched or new_touched or (height_changed and self.mempool_statuses):
|
||||
|
@ -866,8 +872,7 @@ class LBRYElectrumX(SessionBase):
|
|||
self.protocol_tuple = self.PROTOCOL_MIN
|
||||
self.protocol_string = None
|
||||
self.daemon = self.session_mgr.daemon
|
||||
self.bp: BlockProcessor = self.session_mgr.bp
|
||||
self.db: LevelDB = self.bp.db
|
||||
self.db: LevelDB = self.session_mgr.db
|
||||
|
||||
@classmethod
|
||||
def protocol_min_max_strings(cls):
|
||||
|
@ -1008,21 +1013,21 @@ class LBRYElectrumX(SessionBase):
|
|||
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
|
||||
|
||||
async def _cached_resolve_url(self, url):
|
||||
if url not in self.bp.resolve_cache:
|
||||
self.bp.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
|
||||
return self.bp.resolve_cache[url]
|
||||
if url not in self.session_mgr.resolve_cache:
|
||||
self.session_mgr.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
|
||||
return self.session_mgr.resolve_cache[url]
|
||||
|
||||
async def claimtrie_resolve(self, *urls) -> str:
|
||||
sorted_urls = tuple(sorted(urls))
|
||||
self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls))
|
||||
try:
|
||||
if sorted_urls in self.bp.resolve_outputs_cache:
|
||||
return self.bp.resolve_outputs_cache[sorted_urls]
|
||||
if sorted_urls in self.session_mgr.resolve_outputs_cache:
|
||||
return self.session_mgr.resolve_outputs_cache[sorted_urls]
|
||||
rows, extra = [], []
|
||||
for url in urls:
|
||||
if url not in self.bp.resolve_cache:
|
||||
self.bp.resolve_cache[url] = await self._cached_resolve_url(url)
|
||||
stream, channel, repost, reposted_channel = self.bp.resolve_cache[url]
|
||||
if url not in self.session_mgr.resolve_cache:
|
||||
self.session_mgr.resolve_cache[url] = await self._cached_resolve_url(url)
|
||||
stream, channel, repost, reposted_channel = self.session_mgr.resolve_cache[url]
|
||||
if isinstance(channel, ResolveCensoredError):
|
||||
rows.append(channel)
|
||||
extra.append(channel.censor_row)
|
||||
|
@ -1047,7 +1052,7 @@ class LBRYElectrumX(SessionBase):
|
|||
if reposted_channel:
|
||||
extra.append(reposted_channel)
|
||||
await asyncio.sleep(0)
|
||||
self.bp.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor(
|
||||
self.session_mgr.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor(
|
||||
None, Outputs.to_base64, rows, extra, 0, None, None
|
||||
)
|
||||
return result
|
||||
|
@ -1055,7 +1060,7 @@ class LBRYElectrumX(SessionBase):
|
|||
self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls))
|
||||
|
||||
async def get_server_height(self):
|
||||
return self.bp.height
|
||||
return self.db.db_height
|
||||
|
||||
async def transaction_get_height(self, tx_hash):
|
||||
self.assert_tx_hash(tx_hash)
|
||||
|
|
10
setup.py
10
setup.py
|
@ -7,9 +7,11 @@ BASE = os.path.dirname(__file__)
|
|||
with open(os.path.join(BASE, 'README.md'), encoding='utf-8') as fh:
|
||||
long_description = fh.read()
|
||||
|
||||
PLYVEL = []
|
||||
if sys.platform.startswith('linux'):
|
||||
PLYVEL.append('plyvel==1.3.0')
|
||||
|
||||
ROCKSDB = []
|
||||
if sys.platform.startswith('linux') or sys.platform.startswith('darwin'):
|
||||
ROCKSDB.append('lbry-rocksdb==0.8.1')
|
||||
|
||||
|
||||
setup(
|
||||
name=__name__,
|
||||
|
@ -57,7 +59,7 @@ setup(
|
|||
'pylru==1.1.0',
|
||||
'elasticsearch==7.10.1',
|
||||
'grpcio==1.38.0'
|
||||
] + PLYVEL,
|
||||
] + ROCKSDB,
|
||||
extras_require={
|
||||
'torrent': ['lbry-libtorrent'],
|
||||
'lint': ['pylint==2.10.0'],
|
||||
|
|
Loading…
Reference in a new issue