From 8e0fc8976d929f8e07e0b74c9c032cedd5d13c96 Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Sat, 15 Jan 2022 18:05:05 +0000 Subject: [PATCH] Revert rocksdb changes to make sure it works with lbcd --- docker/Dockerfile.wallet_server_deploy | 58 ++++++++++++ lbry/wallet/server/daemon.py | 2 +- lbry/wallet/server/db/db.py | 123 +++---------------------- lbry/wallet/server/db/prefixes.py | 38 +++++++- setup.py | 10 +- 5 files changed, 108 insertions(+), 123 deletions(-) create mode 100644 docker/Dockerfile.wallet_server_deploy diff --git a/docker/Dockerfile.wallet_server_deploy b/docker/Dockerfile.wallet_server_deploy new file mode 100644 index 000000000..559307464 --- /dev/null +++ b/docker/Dockerfile.wallet_server_deploy @@ -0,0 +1,58 @@ +# FROM debian:10-slim +FROM python:3.7.12-slim-buster + +ARG user=lbry +ARG db_dir=/database +ARG projects_dir=/home/$user + +ARG DOCKER_TAG +ARG DOCKER_COMMIT=docker +ENV DOCKER_TAG=$DOCKER_TAG DOCKER_COMMIT=$DOCKER_COMMIT + +RUN apt-get update && \ + apt-get -y --no-install-recommends install \ + wget \ + tar unzip \ + build-essential \ + automake libtool \ + pkg-config \ + librocksdb-dev +# python3.7 \ +# python3-dev \ +# python3-pip \ +# python3-wheel \ +# python3-cffi \ +# python3-setuptools && \ +# update-alternatives --install /usr/bin/pip pip /usr/bin/pip3 1 && \ +# rm -rf /var/lib/apt/lists/* + +# RUN pip install lbry-rocksdb +RUN pip install uvloop +RUN groupadd -g 999 $user && useradd -m -u 999 -g $user $user +RUN mkdir -p $db_dir +RUN chown -R $user:$user $db_dir + +COPY . $projects_dir +RUN chown -R $user:$user $projects_dir + +USER $user +WORKDIR $projects_dir + +RUN make install +RUN python3 docker/set_build.py +RUN rm ~/.cache -rf + +# entry point +ARG host=0.0.0.0 +ARG tcp_port=50001 +ARG daemon_url=https://lbry:lbry@192.99.151.178:9245/ +VOLUME $db_dir +ENV TCP_PORT=$tcp_port +ENV HOST=$host +ENV DAEMON_URL=$daemon_url +ENV DB_DIRECTORY=$db_dir +ENV MAX_SESSIONS=1000000000 +ENV MAX_SEND=1000000000000000000 +ENV EVENT_LOOP_POLICY=uvloop +COPY ./docker/wallet_server_entrypoint.sh /entrypoint.sh +ENTRYPOINT ["/entrypoint.sh"] diff --git a/lbry/wallet/server/daemon.py b/lbry/wallet/server/daemon.py index 3f4bd451f..8ed2dedfb 100644 --- a/lbry/wallet/server/daemon.py +++ b/lbry/wallet/server/daemon.py @@ -53,7 +53,7 @@ class Daemon: self.max_retry = max_retry self._height = None self.available_rpcs = {} - self.connector = aiohttp.TCPConnector() + self.connector = aiohttp.TCPConnector(ssl=False) # don't verify certs self._block_hash_cache = LRUCacheWithMetrics(100000) self._block_cache = LRUCacheWithMetrics(2 ** 13, metric_name='block', namespace=NAMESPACE) diff --git a/lbry/wallet/server/db/db.py b/lbry/wallet/server/db/db.py index 1545ee05f..6d613df93 100644 --- a/lbry/wallet/server/db/db.py +++ b/lbry/wallet/server/db/db.py @@ -1,130 +1,32 @@ import struct -import rocksdb from typing import Optional from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete -class RocksDBStore: - def __init__(self, path: str, cache_mb: int, max_open_files: int, secondary_path: str = ''): - # Use snappy compression (the default) - self.path = path - self._max_open_files = max_open_files - self.db = rocksdb.DB(path, self.get_options(), secondary_name=secondary_path) - # self.multi_get = self.db.multi_get - - def get_options(self): - return rocksdb.Options( - create_if_missing=True, use_fsync=True, target_file_size_base=33554432, - max_open_files=self._max_open_files - ) - +class KeyValueStorage: def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]: - return self.db.get(key, fill_cache=fill_cache) + raise NotImplemented() def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None, include_key=True, include_value=True, fill_cache=True): - return RocksDBIterator( - self.db, reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop, - prefix=prefix, include_key=include_key, include_value=include_value - ) + raise NotImplemented() - def write_batch(self, disable_wal: bool = False, sync: bool = False): - return RocksDBWriteBatch(self.db, sync=sync, disable_wal=disable_wal) + def write_batch(self, transaction: bool = False): + raise NotImplemented() def close(self): - self.db.close() - self.db = None + raise NotImplemented() @property def closed(self) -> bool: - return self.db is None - - def try_catch_up_with_primary(self): - self.db.try_catch_up_with_primary() - - -class RocksDBWriteBatch: - def __init__(self, db: rocksdb.DB, sync: bool = False, disable_wal: bool = False): - self.batch = rocksdb.WriteBatch() - self.db = db - self.sync = sync - self.disable_wal = disable_wal - - def __enter__(self): - return self.batch - - def __exit__(self, exc_type, exc_val, exc_tb): - if not exc_val: - self.db.write(self.batch, sync=self.sync, disable_wal=self.disable_wal) - - -class RocksDBIterator: - """An iterator for RocksDB.""" - - __slots__ = [ - 'start', - 'prefix', - 'stop', - 'iterator', - 'include_key', - 'include_value', - 'prev_k', - 'reverse', - 'include_start', - 'include_stop' - ] - - def __init__(self, db: rocksdb.DB, prefix: bytes = None, start: bool = None, stop: bytes = None, - include_key: bool = True, include_value: bool = True, reverse: bool = False, - include_start: bool = True, include_stop: bool = False): - assert (start is None and stop is None) or (prefix is None), 'cannot use start/stop and prefix' - self.start = start - self.prefix = prefix - self.stop = stop - self.iterator = db.iteritems() if not reverse else reversed(db.iteritems()) - if prefix is not None: - self.iterator.seek(prefix) - elif start is not None: - self.iterator.seek(start) - self.include_key = include_key - self.include_value = include_value - self.prev_k = None - self.reverse = reverse - self.include_start = include_start - self.include_stop = include_stop - - def __iter__(self): - return self - - def _check_stop_iteration(self, key: bytes): - if self.stop is not None and (key.startswith(self.stop) or self.stop < key[:len(self.stop)]): - raise StopIteration - elif self.start is not None and self.start > key[:len(self.start)]: - raise StopIteration - elif self.prefix is not None and not key.startswith(self.prefix): - raise StopIteration - - def __next__(self): - # TODO: include start/stop on/off - # check for needing to stop from previous iteration - if self.prev_k is not None: - self._check_stop_iteration(self.prev_k) - k, v = next(self.iterator) - self._check_stop_iteration(k) - self.prev_k = k - - if self.include_key and self.include_value: - return k, v - elif self.include_key: - return k - return v + raise NotImplemented() class PrefixDB: UNDO_KEY_STRUCT = struct.Struct(b'>Q') - def __init__(self, db: RocksDBStore, max_undo_depth: int = 200, unsafe_prefixes=None): + def __init__(self, db: KeyValueStorage, max_undo_depth: int = 200, unsafe_prefixes=None): self._db = db self._op_stack = RevertableOpStack(db.get, unsafe_prefixes=unsafe_prefixes) self._max_undo_depth = max_undo_depth @@ -135,7 +37,7 @@ class PrefixDB: Changes written cannot be undone """ try: - with self._db.write_batch(sync=True) as batch: + with self._db.write_batch(transaction=True) as batch: batch_put = batch.put batch_delete = batch.delete for staged_change in self._op_stack: @@ -159,7 +61,7 @@ class PrefixDB: include_value=False )) try: - with self._db.write_batch(sync=True) as batch: + with self._db.write_batch(transaction=True) as batch: batch_put = batch.put batch_delete = batch.delete for staged_change in self._op_stack: @@ -180,7 +82,7 @@ class PrefixDB: undo_key = DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height) self._op_stack.apply_packed_undo_ops(self._db.get(undo_key)) try: - with self._db.write_batch(sync=True) as batch: + with self._db.write_batch(transaction=True) as batch: batch_put = batch.put batch_delete = batch.delete for staged_change in self._op_stack: @@ -206,9 +108,6 @@ class PrefixDB: if not self._db.closed: self._db.close() - def try_catch_up_with_primary(self): - self._db.try_catch_up_with_primary() - @property def closed(self): return self._db.closed diff --git a/lbry/wallet/server/db/prefixes.py b/lbry/wallet/server/db/prefixes.py index e51313097..4dbfe707e 100644 --- a/lbry/wallet/server/db/prefixes.py +++ b/lbry/wallet/server/db/prefixes.py @@ -4,7 +4,7 @@ import array import base64 from typing import Union, Tuple, NamedTuple, Optional from lbry.wallet.server.db import DB_PREFIXES -from lbry.wallet.server.db.db import RocksDBStore, PrefixDB +from lbry.wallet.server.db.db import KeyValueStorage, PrefixDB from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete from lbry.schema.url import normalize_name @@ -38,7 +38,7 @@ class PrefixRow(metaclass=PrefixRowType): value_struct: struct.Struct key_part_lambdas = [] - def __init__(self, db: RocksDBStore, op_stack: RevertableOpStack): + def __init__(self, db: KeyValueStorage, op_stack: RevertableOpStack): self._db = db self._op_stack = op_stack @@ -1595,10 +1595,40 @@ class BlockTxsPrefixRow(PrefixRow): return cls.pack_key(height), cls.pack_value(tx_hashes) +class LevelDBStore(KeyValueStorage): + def __init__(self, path: str, cache_mb: int, max_open_files: int): + import plyvel + self.db = plyvel.DB( + path, create_if_missing=True, max_open_files=max_open_files, + lru_cache_size=cache_mb * 1024 * 1024, write_buffer_size=64 * 1024 * 1024, + max_file_size=1024 * 1024 * 64, bloom_filter_bits=32 + ) + + def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]: + return self.db.get(key, fill_cache=fill_cache) + + def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None, + include_key=True, include_value=True, fill_cache=True): + return self.db.iterator( + reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop, + prefix=prefix, include_key=include_key, include_value=include_value, fill_cache=fill_cache + ) + + def write_batch(self, transaction: bool = False, sync: bool = False): + return self.db.write_batch(transaction=transaction, sync=sync) + + def close(self): + return self.db.close() + + @property + def closed(self) -> bool: + return self.db.closed + + class HubDB(PrefixDB): def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 512, - secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None): - db = RocksDBStore(path, cache_mb, max_open_files, secondary_path=secondary_path) + unsafe_prefixes: Optional[typing.Set[bytes]] = None): + db = LevelDBStore(path, cache_mb, max_open_files) super().__init__(db, reorg_limit, unsafe_prefixes=unsafe_prefixes) self.claim_to_support = ClaimToSupportPrefixRow(db, self._op_stack) self.support_to_claim = SupportToClaimPrefixRow(db, self._op_stack) diff --git a/setup.py b/setup.py index 91e661e9a..56832e8eb 100644 --- a/setup.py +++ b/setup.py @@ -7,11 +7,9 @@ BASE = os.path.dirname(__file__) with open(os.path.join(BASE, 'README.md'), encoding='utf-8') as fh: long_description = fh.read() - -ROCKSDB = [] -if sys.platform.startswith('linux') or sys.platform.startswith('darwin'): - ROCKSDB.append('lbry-rocksdb==0.8.1') - +PLYVEL = [] +if sys.platform.startswith('linux'): + PLYVEL.append('plyvel==1.3.0') setup( name=__name__, @@ -59,7 +57,7 @@ setup( 'pylru==1.1.0', 'elasticsearch==7.10.1', 'grpcio==1.38.0' - ] + ROCKSDB, + ] + PLYVEL, extras_require={ 'torrent': ['lbry-libtorrent'], 'lint': ['pylint==2.10.0'],