Revert rocksdb changes to make sure it works with lbcd

This commit is contained in:
Jeffrey Picard 2022-01-15 18:05:05 +00:00
parent 04b3967115
commit 8e0fc8976d
5 changed files with 108 additions and 123 deletions

View file

@ -0,0 +1,58 @@
# FROM debian:10-slim
FROM python:3.7.12-slim-buster
ARG user=lbry
ARG db_dir=/database
ARG projects_dir=/home/$user
ARG DOCKER_TAG
ARG DOCKER_COMMIT=docker
ENV DOCKER_TAG=$DOCKER_TAG DOCKER_COMMIT=$DOCKER_COMMIT
RUN apt-get update && \
apt-get -y --no-install-recommends install \
wget \
tar unzip \
build-essential \
automake libtool \
pkg-config \
librocksdb-dev
# python3.7 \
# python3-dev \
# python3-pip \
# python3-wheel \
# python3-cffi \
# python3-setuptools && \
# update-alternatives --install /usr/bin/pip pip /usr/bin/pip3 1 && \
# rm -rf /var/lib/apt/lists/*
# RUN pip install lbry-rocksdb
RUN pip install uvloop
RUN groupadd -g 999 $user && useradd -m -u 999 -g $user $user
RUN mkdir -p $db_dir
RUN chown -R $user:$user $db_dir
COPY . $projects_dir
RUN chown -R $user:$user $projects_dir
USER $user
WORKDIR $projects_dir
RUN make install
RUN python3 docker/set_build.py
RUN rm ~/.cache -rf
# entry point
ARG host=0.0.0.0
ARG tcp_port=50001
ARG daemon_url=https://lbry:lbry@192.99.151.178:9245/
VOLUME $db_dir
ENV TCP_PORT=$tcp_port
ENV HOST=$host
ENV DAEMON_URL=$daemon_url
ENV DB_DIRECTORY=$db_dir
ENV MAX_SESSIONS=1000000000
ENV MAX_SEND=1000000000000000000
ENV EVENT_LOOP_POLICY=uvloop
COPY ./docker/wallet_server_entrypoint.sh /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]

View file

@ -53,7 +53,7 @@ class Daemon:
self.max_retry = max_retry
self._height = None
self.available_rpcs = {}
self.connector = aiohttp.TCPConnector()
self.connector = aiohttp.TCPConnector(ssl=False) # don't verify certs
self._block_hash_cache = LRUCacheWithMetrics(100000)
self._block_cache = LRUCacheWithMetrics(2 ** 13, metric_name='block', namespace=NAMESPACE)

View file

@ -1,130 +1,32 @@
import struct
import rocksdb
from typing import Optional
from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
class RocksDBStore:
def __init__(self, path: str, cache_mb: int, max_open_files: int, secondary_path: str = ''):
# Use snappy compression (the default)
self.path = path
self._max_open_files = max_open_files
self.db = rocksdb.DB(path, self.get_options(), secondary_name=secondary_path)
# self.multi_get = self.db.multi_get
def get_options(self):
return rocksdb.Options(
create_if_missing=True, use_fsync=True, target_file_size_base=33554432,
max_open_files=self._max_open_files
)
class KeyValueStorage:
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
return self.db.get(key, fill_cache=fill_cache)
raise NotImplemented()
def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None,
include_key=True, include_value=True, fill_cache=True):
return RocksDBIterator(
self.db, reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop,
prefix=prefix, include_key=include_key, include_value=include_value
)
raise NotImplemented()
def write_batch(self, disable_wal: bool = False, sync: bool = False):
return RocksDBWriteBatch(self.db, sync=sync, disable_wal=disable_wal)
def write_batch(self, transaction: bool = False):
raise NotImplemented()
def close(self):
self.db.close()
self.db = None
raise NotImplemented()
@property
def closed(self) -> bool:
return self.db is None
def try_catch_up_with_primary(self):
self.db.try_catch_up_with_primary()
class RocksDBWriteBatch:
def __init__(self, db: rocksdb.DB, sync: bool = False, disable_wal: bool = False):
self.batch = rocksdb.WriteBatch()
self.db = db
self.sync = sync
self.disable_wal = disable_wal
def __enter__(self):
return self.batch
def __exit__(self, exc_type, exc_val, exc_tb):
if not exc_val:
self.db.write(self.batch, sync=self.sync, disable_wal=self.disable_wal)
class RocksDBIterator:
"""An iterator for RocksDB."""
__slots__ = [
'start',
'prefix',
'stop',
'iterator',
'include_key',
'include_value',
'prev_k',
'reverse',
'include_start',
'include_stop'
]
def __init__(self, db: rocksdb.DB, prefix: bytes = None, start: bool = None, stop: bytes = None,
include_key: bool = True, include_value: bool = True, reverse: bool = False,
include_start: bool = True, include_stop: bool = False):
assert (start is None and stop is None) or (prefix is None), 'cannot use start/stop and prefix'
self.start = start
self.prefix = prefix
self.stop = stop
self.iterator = db.iteritems() if not reverse else reversed(db.iteritems())
if prefix is not None:
self.iterator.seek(prefix)
elif start is not None:
self.iterator.seek(start)
self.include_key = include_key
self.include_value = include_value
self.prev_k = None
self.reverse = reverse
self.include_start = include_start
self.include_stop = include_stop
def __iter__(self):
return self
def _check_stop_iteration(self, key: bytes):
if self.stop is not None and (key.startswith(self.stop) or self.stop < key[:len(self.stop)]):
raise StopIteration
elif self.start is not None and self.start > key[:len(self.start)]:
raise StopIteration
elif self.prefix is not None and not key.startswith(self.prefix):
raise StopIteration
def __next__(self):
# TODO: include start/stop on/off
# check for needing to stop from previous iteration
if self.prev_k is not None:
self._check_stop_iteration(self.prev_k)
k, v = next(self.iterator)
self._check_stop_iteration(k)
self.prev_k = k
if self.include_key and self.include_value:
return k, v
elif self.include_key:
return k
return v
raise NotImplemented()
class PrefixDB:
UNDO_KEY_STRUCT = struct.Struct(b'>Q')
def __init__(self, db: RocksDBStore, max_undo_depth: int = 200, unsafe_prefixes=None):
def __init__(self, db: KeyValueStorage, max_undo_depth: int = 200, unsafe_prefixes=None):
self._db = db
self._op_stack = RevertableOpStack(db.get, unsafe_prefixes=unsafe_prefixes)
self._max_undo_depth = max_undo_depth
@ -135,7 +37,7 @@ class PrefixDB:
Changes written cannot be undone
"""
try:
with self._db.write_batch(sync=True) as batch:
with self._db.write_batch(transaction=True) as batch:
batch_put = batch.put
batch_delete = batch.delete
for staged_change in self._op_stack:
@ -159,7 +61,7 @@ class PrefixDB:
include_value=False
))
try:
with self._db.write_batch(sync=True) as batch:
with self._db.write_batch(transaction=True) as batch:
batch_put = batch.put
batch_delete = batch.delete
for staged_change in self._op_stack:
@ -180,7 +82,7 @@ class PrefixDB:
undo_key = DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height)
self._op_stack.apply_packed_undo_ops(self._db.get(undo_key))
try:
with self._db.write_batch(sync=True) as batch:
with self._db.write_batch(transaction=True) as batch:
batch_put = batch.put
batch_delete = batch.delete
for staged_change in self._op_stack:
@ -206,9 +108,6 @@ class PrefixDB:
if not self._db.closed:
self._db.close()
def try_catch_up_with_primary(self):
self._db.try_catch_up_with_primary()
@property
def closed(self):
return self._db.closed

View file

@ -4,7 +4,7 @@ import array
import base64
from typing import Union, Tuple, NamedTuple, Optional
from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.db import RocksDBStore, PrefixDB
from lbry.wallet.server.db.db import KeyValueStorage, PrefixDB
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
from lbry.schema.url import normalize_name
@ -38,7 +38,7 @@ class PrefixRow(metaclass=PrefixRowType):
value_struct: struct.Struct
key_part_lambdas = []
def __init__(self, db: RocksDBStore, op_stack: RevertableOpStack):
def __init__(self, db: KeyValueStorage, op_stack: RevertableOpStack):
self._db = db
self._op_stack = op_stack
@ -1595,10 +1595,40 @@ class BlockTxsPrefixRow(PrefixRow):
return cls.pack_key(height), cls.pack_value(tx_hashes)
class LevelDBStore(KeyValueStorage):
def __init__(self, path: str, cache_mb: int, max_open_files: int):
import plyvel
self.db = plyvel.DB(
path, create_if_missing=True, max_open_files=max_open_files,
lru_cache_size=cache_mb * 1024 * 1024, write_buffer_size=64 * 1024 * 1024,
max_file_size=1024 * 1024 * 64, bloom_filter_bits=32
)
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
return self.db.get(key, fill_cache=fill_cache)
def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None,
include_key=True, include_value=True, fill_cache=True):
return self.db.iterator(
reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop,
prefix=prefix, include_key=include_key, include_value=include_value, fill_cache=fill_cache
)
def write_batch(self, transaction: bool = False, sync: bool = False):
return self.db.write_batch(transaction=transaction, sync=sync)
def close(self):
return self.db.close()
@property
def closed(self) -> bool:
return self.db.closed
class HubDB(PrefixDB):
def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 512,
secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None):
db = RocksDBStore(path, cache_mb, max_open_files, secondary_path=secondary_path)
unsafe_prefixes: Optional[typing.Set[bytes]] = None):
db = LevelDBStore(path, cache_mb, max_open_files)
super().__init__(db, reorg_limit, unsafe_prefixes=unsafe_prefixes)
self.claim_to_support = ClaimToSupportPrefixRow(db, self._op_stack)
self.support_to_claim = SupportToClaimPrefixRow(db, self._op_stack)

View file

@ -7,11 +7,9 @@ BASE = os.path.dirname(__file__)
with open(os.path.join(BASE, 'README.md'), encoding='utf-8') as fh:
long_description = fh.read()
ROCKSDB = []
if sys.platform.startswith('linux') or sys.platform.startswith('darwin'):
ROCKSDB.append('lbry-rocksdb==0.8.1')
PLYVEL = []
if sys.platform.startswith('linux'):
PLYVEL.append('plyvel==1.3.0')
setup(
name=__name__,
@ -59,7 +57,7 @@ setup(
'pylru==1.1.0',
'elasticsearch==7.10.1',
'grpcio==1.38.0'
] + ROCKSDB,
] + PLYVEL,
extras_require={
'torrent': ['lbry-libtorrent'],
'lint': ['pylint==2.10.0'],