use rocksdb instead of leveldb
-dont use block processor directly from session manager
This commit is contained in:
parent
b5ead91746
commit
bf3dec9c89
6 changed files with 151 additions and 71 deletions
|
@ -15,7 +15,6 @@ RUN apt-get update && \
|
||||||
build-essential \
|
build-essential \
|
||||||
automake libtool \
|
automake libtool \
|
||||||
pkg-config \
|
pkg-config \
|
||||||
libleveldb-dev \
|
|
||||||
python3.7 \
|
python3.7 \
|
||||||
python3-dev \
|
python3-dev \
|
||||||
python3-pip \
|
python3-pip \
|
||||||
|
|
|
@ -1,32 +1,130 @@
|
||||||
import struct
|
import struct
|
||||||
|
import rocksdb
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from lbry.wallet.server.db import DB_PREFIXES
|
from lbry.wallet.server.db import DB_PREFIXES
|
||||||
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
|
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
|
||||||
|
|
||||||
|
|
||||||
class KeyValueStorage:
|
class RocksDBStore:
|
||||||
|
def __init__(self, path: str, cache_mb: int, max_open_files: int, secondary_path: str = ''):
|
||||||
|
# Use snappy compression (the default)
|
||||||
|
self.path = path
|
||||||
|
self._max_open_files = max_open_files
|
||||||
|
self.db = rocksdb.DB(path, self.get_options(), secondary_name=secondary_path)
|
||||||
|
# self.multi_get = self.db.multi_get
|
||||||
|
|
||||||
|
def get_options(self):
|
||||||
|
return rocksdb.Options(
|
||||||
|
create_if_missing=True, use_fsync=True, target_file_size_base=33554432,
|
||||||
|
max_open_files=self._max_open_files
|
||||||
|
)
|
||||||
|
|
||||||
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
|
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
|
||||||
raise NotImplemented()
|
return self.db.get(key, fill_cache=fill_cache)
|
||||||
|
|
||||||
def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None,
|
def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None,
|
||||||
include_key=True, include_value=True, fill_cache=True):
|
include_key=True, include_value=True, fill_cache=True):
|
||||||
raise NotImplemented()
|
return RocksDBIterator(
|
||||||
|
self.db, reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop,
|
||||||
|
prefix=prefix, include_key=include_key, include_value=include_value
|
||||||
|
)
|
||||||
|
|
||||||
def write_batch(self, transaction: bool = False):
|
def write_batch(self, disable_wal: bool = False, sync: bool = False):
|
||||||
raise NotImplemented()
|
return RocksDBWriteBatch(self.db, sync=sync, disable_wal=disable_wal)
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
raise NotImplemented()
|
self.db.close()
|
||||||
|
self.db = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def closed(self) -> bool:
|
def closed(self) -> bool:
|
||||||
raise NotImplemented()
|
return self.db is None
|
||||||
|
|
||||||
|
def try_catch_up_with_primary(self):
|
||||||
|
self.db.try_catch_up_with_primary()
|
||||||
|
|
||||||
|
|
||||||
|
class RocksDBWriteBatch:
|
||||||
|
def __init__(self, db: rocksdb.DB, sync: bool = False, disable_wal: bool = False):
|
||||||
|
self.batch = rocksdb.WriteBatch()
|
||||||
|
self.db = db
|
||||||
|
self.sync = sync
|
||||||
|
self.disable_wal = disable_wal
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
return self.batch
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
if not exc_val:
|
||||||
|
self.db.write(self.batch, sync=self.sync, disable_wal=self.disable_wal)
|
||||||
|
|
||||||
|
|
||||||
|
class RocksDBIterator:
|
||||||
|
"""An iterator for RocksDB."""
|
||||||
|
|
||||||
|
__slots__ = [
|
||||||
|
'start',
|
||||||
|
'prefix',
|
||||||
|
'stop',
|
||||||
|
'iterator',
|
||||||
|
'include_key',
|
||||||
|
'include_value',
|
||||||
|
'prev_k',
|
||||||
|
'reverse',
|
||||||
|
'include_start',
|
||||||
|
'include_stop'
|
||||||
|
]
|
||||||
|
|
||||||
|
def __init__(self, db: rocksdb.DB, prefix: bytes = None, start: bool = None, stop: bytes = None,
|
||||||
|
include_key: bool = True, include_value: bool = True, reverse: bool = False,
|
||||||
|
include_start: bool = True, include_stop: bool = False):
|
||||||
|
assert (start is None and stop is None) or (prefix is None), 'cannot use start/stop and prefix'
|
||||||
|
self.start = start
|
||||||
|
self.prefix = prefix
|
||||||
|
self.stop = stop
|
||||||
|
self.iterator = db.iteritems() if not reverse else reversed(db.iteritems())
|
||||||
|
if prefix is not None:
|
||||||
|
self.iterator.seek(prefix)
|
||||||
|
elif start is not None:
|
||||||
|
self.iterator.seek(start)
|
||||||
|
self.include_key = include_key
|
||||||
|
self.include_value = include_value
|
||||||
|
self.prev_k = None
|
||||||
|
self.reverse = reverse
|
||||||
|
self.include_start = include_start
|
||||||
|
self.include_stop = include_stop
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def _check_stop_iteration(self, key: bytes):
|
||||||
|
if self.stop is not None and (key.startswith(self.stop) or self.stop < key[:len(self.stop)]):
|
||||||
|
raise StopIteration
|
||||||
|
elif self.start is not None and self.start > key[:len(self.start)]:
|
||||||
|
raise StopIteration
|
||||||
|
elif self.prefix is not None and not key.startswith(self.prefix):
|
||||||
|
raise StopIteration
|
||||||
|
|
||||||
|
def __next__(self):
|
||||||
|
# TODO: include start/stop on/off
|
||||||
|
# check for needing to stop from previous iteration
|
||||||
|
if self.prev_k is not None:
|
||||||
|
self._check_stop_iteration(self.prev_k)
|
||||||
|
k, v = next(self.iterator)
|
||||||
|
self._check_stop_iteration(k)
|
||||||
|
self.prev_k = k
|
||||||
|
|
||||||
|
if self.include_key and self.include_value:
|
||||||
|
return k, v
|
||||||
|
elif self.include_key:
|
||||||
|
return k
|
||||||
|
return v
|
||||||
|
|
||||||
|
|
||||||
class PrefixDB:
|
class PrefixDB:
|
||||||
UNDO_KEY_STRUCT = struct.Struct(b'>Q')
|
UNDO_KEY_STRUCT = struct.Struct(b'>Q')
|
||||||
|
|
||||||
def __init__(self, db: KeyValueStorage, max_undo_depth: int = 200, unsafe_prefixes=None):
|
def __init__(self, db: RocksDBStore, max_undo_depth: int = 200, unsafe_prefixes=None):
|
||||||
self._db = db
|
self._db = db
|
||||||
self._op_stack = RevertableOpStack(db.get, unsafe_prefixes=unsafe_prefixes)
|
self._op_stack = RevertableOpStack(db.get, unsafe_prefixes=unsafe_prefixes)
|
||||||
self._max_undo_depth = max_undo_depth
|
self._max_undo_depth = max_undo_depth
|
||||||
|
@ -37,7 +135,7 @@ class PrefixDB:
|
||||||
Changes written cannot be undone
|
Changes written cannot be undone
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
with self._db.write_batch(transaction=True) as batch:
|
with self._db.write_batch(sync=True) as batch:
|
||||||
batch_put = batch.put
|
batch_put = batch.put
|
||||||
batch_delete = batch.delete
|
batch_delete = batch.delete
|
||||||
for staged_change in self._op_stack:
|
for staged_change in self._op_stack:
|
||||||
|
@ -61,7 +159,7 @@ class PrefixDB:
|
||||||
include_value=False
|
include_value=False
|
||||||
))
|
))
|
||||||
try:
|
try:
|
||||||
with self._db.write_batch(transaction=True) as batch:
|
with self._db.write_batch(sync=True) as batch:
|
||||||
batch_put = batch.put
|
batch_put = batch.put
|
||||||
batch_delete = batch.delete
|
batch_delete = batch.delete
|
||||||
for staged_change in self._op_stack:
|
for staged_change in self._op_stack:
|
||||||
|
@ -82,7 +180,7 @@ class PrefixDB:
|
||||||
undo_key = DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height)
|
undo_key = DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height)
|
||||||
self._op_stack.apply_packed_undo_ops(self._db.get(undo_key))
|
self._op_stack.apply_packed_undo_ops(self._db.get(undo_key))
|
||||||
try:
|
try:
|
||||||
with self._db.write_batch(transaction=True) as batch:
|
with self._db.write_batch(sync=True) as batch:
|
||||||
batch_put = batch.put
|
batch_put = batch.put
|
||||||
batch_delete = batch.delete
|
batch_delete = batch.delete
|
||||||
for staged_change in self._op_stack:
|
for staged_change in self._op_stack:
|
||||||
|
@ -108,6 +206,9 @@ class PrefixDB:
|
||||||
if not self._db.closed:
|
if not self._db.closed:
|
||||||
self._db.close()
|
self._db.close()
|
||||||
|
|
||||||
|
def try_catch_up_with_primary(self):
|
||||||
|
self._db.try_catch_up_with_primary()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def closed(self):
|
def closed(self):
|
||||||
return self._db.closed
|
return self._db.closed
|
||||||
|
|
|
@ -4,7 +4,7 @@ import array
|
||||||
import base64
|
import base64
|
||||||
from typing import Union, Tuple, NamedTuple, Optional
|
from typing import Union, Tuple, NamedTuple, Optional
|
||||||
from lbry.wallet.server.db import DB_PREFIXES
|
from lbry.wallet.server.db import DB_PREFIXES
|
||||||
from lbry.wallet.server.db.db import KeyValueStorage, PrefixDB
|
from lbry.wallet.server.db.db import RocksDBStore, PrefixDB
|
||||||
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
|
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
|
||||||
from lbry.schema.url import normalize_name
|
from lbry.schema.url import normalize_name
|
||||||
|
|
||||||
|
@ -38,7 +38,7 @@ class PrefixRow(metaclass=PrefixRowType):
|
||||||
value_struct: struct.Struct
|
value_struct: struct.Struct
|
||||||
key_part_lambdas = []
|
key_part_lambdas = []
|
||||||
|
|
||||||
def __init__(self, db: KeyValueStorage, op_stack: RevertableOpStack):
|
def __init__(self, db: RocksDBStore, op_stack: RevertableOpStack):
|
||||||
self._db = db
|
self._db = db
|
||||||
self._op_stack = op_stack
|
self._op_stack = op_stack
|
||||||
|
|
||||||
|
@ -1595,40 +1595,10 @@ class BlockTxsPrefixRow(PrefixRow):
|
||||||
return cls.pack_key(height), cls.pack_value(tx_hashes)
|
return cls.pack_key(height), cls.pack_value(tx_hashes)
|
||||||
|
|
||||||
|
|
||||||
class LevelDBStore(KeyValueStorage):
|
|
||||||
def __init__(self, path: str, cache_mb: int, max_open_files: int):
|
|
||||||
import plyvel
|
|
||||||
self.db = plyvel.DB(
|
|
||||||
path, create_if_missing=True, max_open_files=max_open_files,
|
|
||||||
lru_cache_size=cache_mb * 1024 * 1024, write_buffer_size=64 * 1024 * 1024,
|
|
||||||
max_file_size=1024 * 1024 * 64, bloom_filter_bits=32
|
|
||||||
)
|
|
||||||
|
|
||||||
def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]:
|
|
||||||
return self.db.get(key, fill_cache=fill_cache)
|
|
||||||
|
|
||||||
def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None,
|
|
||||||
include_key=True, include_value=True, fill_cache=True):
|
|
||||||
return self.db.iterator(
|
|
||||||
reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop,
|
|
||||||
prefix=prefix, include_key=include_key, include_value=include_value, fill_cache=fill_cache
|
|
||||||
)
|
|
||||||
|
|
||||||
def write_batch(self, transaction: bool = False, sync: bool = False):
|
|
||||||
return self.db.write_batch(transaction=transaction, sync=sync)
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
return self.db.close()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def closed(self) -> bool:
|
|
||||||
return self.db.closed
|
|
||||||
|
|
||||||
|
|
||||||
class HubDB(PrefixDB):
|
class HubDB(PrefixDB):
|
||||||
def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 512,
|
def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 512,
|
||||||
unsafe_prefixes: Optional[typing.Set[bytes]] = None):
|
secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None):
|
||||||
db = LevelDBStore(path, cache_mb, max_open_files)
|
db = RocksDBStore(path, cache_mb, max_open_files, secondary_path=secondary_path)
|
||||||
super().__init__(db, reorg_limit, unsafe_prefixes=unsafe_prefixes)
|
super().__init__(db, reorg_limit, unsafe_prefixes=unsafe_prefixes)
|
||||||
self.claim_to_support = ClaimToSupportPrefixRow(db, self._op_stack)
|
self.claim_to_support = ClaimToSupportPrefixRow(db, self._op_stack)
|
||||||
self.support_to_claim = SupportToClaimPrefixRow(db, self._op_stack)
|
self.support_to_claim = SupportToClaimPrefixRow(db, self._op_stack)
|
||||||
|
|
|
@ -26,7 +26,10 @@ class Server:
|
||||||
self.prometheus_server: typing.Optional[PrometheusServer] = None
|
self.prometheus_server: typing.Optional[PrometheusServer] = None
|
||||||
|
|
||||||
self.session_mgr = LBRYSessionManager(
|
self.session_mgr = LBRYSessionManager(
|
||||||
env, db, bp, daemon, self.shutdown_event
|
env, db, bp.mempool, bp.history_cache, bp.resolve_cache, bp.resolve_outputs_cache, daemon,
|
||||||
|
self.shutdown_event,
|
||||||
|
on_available_callback=bp.status_server.set_available,
|
||||||
|
on_unavailable_callback=bp.status_server.set_unavailable
|
||||||
)
|
)
|
||||||
self._indexer_task = None
|
self._indexer_task = None
|
||||||
|
|
||||||
|
|
|
@ -170,13 +170,16 @@ class SessionManager:
|
||||||
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
|
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(self, env: 'Env', db: LevelDB, bp: BlockProcessor, daemon: 'Daemon', shutdown_event: asyncio.Event):
|
def __init__(self, env: 'Env', db: LevelDB, mempool, history_cache, resolve_cache, resolve_outputs_cache,
|
||||||
|
daemon: 'Daemon', shutdown_event: asyncio.Event,
|
||||||
|
on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]):
|
||||||
env.max_send = max(350000, env.max_send)
|
env.max_send = max(350000, env.max_send)
|
||||||
self.env = env
|
self.env = env
|
||||||
self.db = db
|
self.db = db
|
||||||
self.bp = bp
|
self.on_available_callback = on_available_callback
|
||||||
|
self.on_unavailable_callback = on_unavailable_callback
|
||||||
self.daemon = daemon
|
self.daemon = daemon
|
||||||
self.mempool = bp.mempool
|
self.mempool = mempool
|
||||||
self.shutdown_event = shutdown_event
|
self.shutdown_event = shutdown_event
|
||||||
self.logger = util.class_logger(__name__, self.__class__.__name__)
|
self.logger = util.class_logger(__name__, self.__class__.__name__)
|
||||||
self.servers: typing.Dict[str, asyncio.AbstractServer] = {}
|
self.servers: typing.Dict[str, asyncio.AbstractServer] = {}
|
||||||
|
@ -186,7 +189,9 @@ class SessionManager:
|
||||||
self.cur_group = SessionGroup(0)
|
self.cur_group = SessionGroup(0)
|
||||||
self.txs_sent = 0
|
self.txs_sent = 0
|
||||||
self.start_time = time.time()
|
self.start_time = time.time()
|
||||||
self.history_cache = self.bp.history_cache
|
self.history_cache = history_cache
|
||||||
|
self.resolve_cache = resolve_cache
|
||||||
|
self.resolve_outputs_cache = resolve_outputs_cache
|
||||||
self.notified_height: typing.Optional[int] = None
|
self.notified_height: typing.Optional[int] = None
|
||||||
# Cache some idea of room to avoid recounting on each subscription
|
# Cache some idea of room to avoid recounting on each subscription
|
||||||
self.subs_room = 0
|
self.subs_room = 0
|
||||||
|
@ -243,7 +248,7 @@ class SessionManager:
|
||||||
await self.session_event.wait()
|
await self.session_event.wait()
|
||||||
self.session_event.clear()
|
self.session_event.clear()
|
||||||
if not paused and len(self.sessions) >= max_sessions:
|
if not paused and len(self.sessions) >= max_sessions:
|
||||||
self.bp.status_server.set_unavailable()
|
self.on_unavailable_callback()
|
||||||
self.logger.info(f'maximum sessions {max_sessions:,d} '
|
self.logger.info(f'maximum sessions {max_sessions:,d} '
|
||||||
f'reached, stopping new connections until '
|
f'reached, stopping new connections until '
|
||||||
f'count drops to {low_watermark:,d}')
|
f'count drops to {low_watermark:,d}')
|
||||||
|
@ -252,7 +257,7 @@ class SessionManager:
|
||||||
# Start listening for incoming connections if paused and
|
# Start listening for incoming connections if paused and
|
||||||
# session count has fallen
|
# session count has fallen
|
||||||
if paused and len(self.sessions) <= low_watermark:
|
if paused and len(self.sessions) <= low_watermark:
|
||||||
self.bp.status_server.set_available()
|
self.on_available_callback()
|
||||||
self.logger.info('resuming listening for incoming connections')
|
self.logger.info('resuming listening for incoming connections')
|
||||||
await self._start_external_servers()
|
await self._start_external_servers()
|
||||||
paused = False
|
paused = False
|
||||||
|
@ -533,7 +538,7 @@ class SessionManager:
|
||||||
await self.start_other()
|
await self.start_other()
|
||||||
await self._start_external_servers()
|
await self._start_external_servers()
|
||||||
server_listening_event.set()
|
server_listening_event.set()
|
||||||
self.bp.status_server.set_available()
|
self.on_available_callback()
|
||||||
# Peer discovery should start after the external servers
|
# Peer discovery should start after the external servers
|
||||||
# because we connect to ourself
|
# because we connect to ourself
|
||||||
await asyncio.wait([
|
await asyncio.wait([
|
||||||
|
@ -628,8 +633,9 @@ class SessionManager:
|
||||||
for hashX in touched.intersection(self.mempool_statuses.keys()):
|
for hashX in touched.intersection(self.mempool_statuses.keys()):
|
||||||
self.mempool_statuses.pop(hashX, None)
|
self.mempool_statuses.pop(hashX, None)
|
||||||
|
|
||||||
|
# self.bp._chain_executor
|
||||||
await asyncio.get_event_loop().run_in_executor(
|
await asyncio.get_event_loop().run_in_executor(
|
||||||
self.bp._chain_executor, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
|
None, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
|
||||||
)
|
)
|
||||||
|
|
||||||
if touched or new_touched or (height_changed and self.mempool_statuses):
|
if touched or new_touched or (height_changed and self.mempool_statuses):
|
||||||
|
@ -866,8 +872,7 @@ class LBRYElectrumX(SessionBase):
|
||||||
self.protocol_tuple = self.PROTOCOL_MIN
|
self.protocol_tuple = self.PROTOCOL_MIN
|
||||||
self.protocol_string = None
|
self.protocol_string = None
|
||||||
self.daemon = self.session_mgr.daemon
|
self.daemon = self.session_mgr.daemon
|
||||||
self.bp: BlockProcessor = self.session_mgr.bp
|
self.db: LevelDB = self.session_mgr.db
|
||||||
self.db: LevelDB = self.bp.db
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def protocol_min_max_strings(cls):
|
def protocol_min_max_strings(cls):
|
||||||
|
@ -1008,21 +1013,21 @@ class LBRYElectrumX(SessionBase):
|
||||||
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
|
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
|
||||||
|
|
||||||
async def _cached_resolve_url(self, url):
|
async def _cached_resolve_url(self, url):
|
||||||
if url not in self.bp.resolve_cache:
|
if url not in self.session_mgr.resolve_cache:
|
||||||
self.bp.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
|
self.session_mgr.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
|
||||||
return self.bp.resolve_cache[url]
|
return self.session_mgr.resolve_cache[url]
|
||||||
|
|
||||||
async def claimtrie_resolve(self, *urls) -> str:
|
async def claimtrie_resolve(self, *urls) -> str:
|
||||||
sorted_urls = tuple(sorted(urls))
|
sorted_urls = tuple(sorted(urls))
|
||||||
self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls))
|
self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls))
|
||||||
try:
|
try:
|
||||||
if sorted_urls in self.bp.resolve_outputs_cache:
|
if sorted_urls in self.session_mgr.resolve_outputs_cache:
|
||||||
return self.bp.resolve_outputs_cache[sorted_urls]
|
return self.session_mgr.resolve_outputs_cache[sorted_urls]
|
||||||
rows, extra = [], []
|
rows, extra = [], []
|
||||||
for url in urls:
|
for url in urls:
|
||||||
if url not in self.bp.resolve_cache:
|
if url not in self.session_mgr.resolve_cache:
|
||||||
self.bp.resolve_cache[url] = await self._cached_resolve_url(url)
|
self.session_mgr.resolve_cache[url] = await self._cached_resolve_url(url)
|
||||||
stream, channel, repost, reposted_channel = self.bp.resolve_cache[url]
|
stream, channel, repost, reposted_channel = self.session_mgr.resolve_cache[url]
|
||||||
if isinstance(channel, ResolveCensoredError):
|
if isinstance(channel, ResolveCensoredError):
|
||||||
rows.append(channel)
|
rows.append(channel)
|
||||||
extra.append(channel.censor_row)
|
extra.append(channel.censor_row)
|
||||||
|
@ -1047,7 +1052,7 @@ class LBRYElectrumX(SessionBase):
|
||||||
if reposted_channel:
|
if reposted_channel:
|
||||||
extra.append(reposted_channel)
|
extra.append(reposted_channel)
|
||||||
await asyncio.sleep(0)
|
await asyncio.sleep(0)
|
||||||
self.bp.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor(
|
self.session_mgr.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor(
|
||||||
None, Outputs.to_base64, rows, extra, 0, None, None
|
None, Outputs.to_base64, rows, extra, 0, None, None
|
||||||
)
|
)
|
||||||
return result
|
return result
|
||||||
|
@ -1055,7 +1060,7 @@ class LBRYElectrumX(SessionBase):
|
||||||
self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls))
|
self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls))
|
||||||
|
|
||||||
async def get_server_height(self):
|
async def get_server_height(self):
|
||||||
return self.bp.height
|
return self.db.db_height
|
||||||
|
|
||||||
async def transaction_get_height(self, tx_hash):
|
async def transaction_get_height(self, tx_hash):
|
||||||
self.assert_tx_hash(tx_hash)
|
self.assert_tx_hash(tx_hash)
|
||||||
|
|
10
setup.py
10
setup.py
|
@ -7,9 +7,11 @@ BASE = os.path.dirname(__file__)
|
||||||
with open(os.path.join(BASE, 'README.md'), encoding='utf-8') as fh:
|
with open(os.path.join(BASE, 'README.md'), encoding='utf-8') as fh:
|
||||||
long_description = fh.read()
|
long_description = fh.read()
|
||||||
|
|
||||||
PLYVEL = []
|
|
||||||
if sys.platform.startswith('linux'):
|
ROCKSDB = []
|
||||||
PLYVEL.append('plyvel==1.3.0')
|
if sys.platform.startswith('linux') or sys.platform.startswith('darwin'):
|
||||||
|
ROCKSDB.append('lbry-rocksdb==0.8.1')
|
||||||
|
|
||||||
|
|
||||||
setup(
|
setup(
|
||||||
name=__name__,
|
name=__name__,
|
||||||
|
@ -57,7 +59,7 @@ setup(
|
||||||
'pylru==1.1.0',
|
'pylru==1.1.0',
|
||||||
'elasticsearch==7.10.1',
|
'elasticsearch==7.10.1',
|
||||||
'grpcio==1.38.0'
|
'grpcio==1.38.0'
|
||||||
] + PLYVEL,
|
] + ROCKSDB,
|
||||||
extras_require={
|
extras_require={
|
||||||
'torrent': ['lbry-libtorrent'],
|
'torrent': ['lbry-libtorrent'],
|
||||||
'lint': ['pylint==2.10.0'],
|
'lint': ['pylint==2.10.0'],
|
||||||
|
|
Loading…
Add table
Reference in a new issue