disable sqlite in block processor

This commit is contained in:
Jack Robison 2021-02-19 13:18:38 -05:00
parent 2c8ceb1217
commit 6988a47e02
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 89 additions and 36 deletions

View file

@ -802,8 +802,8 @@ class LBRYBlockProcessor(BlockProcessor):
self.prefetcher.polling_delay = 0.5
self.should_validate_signatures = self.env.boolean('VALIDATE_CLAIM_SIGNATURES', False)
self.logger.info(f"LbryumX Block Processor - Validating signatures: {self.should_validate_signatures}")
self.sql: SQLDB = self.db.sql
self.timer = Timer('BlockProcessor')
# self.sql: SQLDB = self.db.sql
# self.timer = Timer('BlockProcessor')
def advance_blocks(self, blocks):
if self.sql:

View file

@ -14,7 +14,7 @@ from lbry.wallet.server.daemon import Daemon, LBCDaemon
from lbry.wallet.server.script import ScriptPubKey, OpCodes
from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.session import LBRYElectrumX, LBRYSessionManager
from lbry.wallet.server.db.writer import LBRYLevelDB
# from lbry.wallet.server.db.writer import LBRYLevelDB
from lbry.wallet.server.block_processor import LBRYBlockProcessor
@ -240,7 +240,7 @@ class LBC(Coin):
BLOCK_PROCESSOR = LBRYBlockProcessor
SESSION_MANAGER = LBRYSessionManager
DESERIALIZER = DeserializerSegWit
DB = LBRYLevelDB
DB = LevelDB
NAME = "LBRY"
SHORTNAME = "LBC"
NET = "mainnet"

View file

@ -23,8 +23,9 @@ from prometheus_client import Counter, Info, Histogram, Gauge
import lbry
from lbry.error import TooManyClaimSearchParametersError
from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG
from lbry.schema.result import Outputs
from lbry.wallet.server.block_processor import LBRYBlockProcessor
from lbry.wallet.server.db.writer import LBRYLevelDB
from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.websocket import AdminWebSocket
from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics
from lbry.wallet.rpc.framing import NewlineFramer
@ -175,7 +176,7 @@ class SessionManager:
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
)
def __init__(self, env: 'Env', db: LBRYLevelDB, bp: LBRYBlockProcessor, daemon: 'Daemon', mempool: 'MemPool',
def __init__(self, env: 'Env', db: LevelDB, bp: LBRYBlockProcessor, daemon: 'Daemon', mempool: 'MemPool',
shutdown_event: asyncio.Event):
env.max_send = max(350000, env.max_send)
self.env = env
@ -812,21 +813,21 @@ class LBRYSessionManager(SessionManager):
super().__init__(*args, **kwargs)
self.query_executor = None
self.websocket = None
self.metrics = ServerLoadData()
# self.metrics = ServerLoadData()
self.metrics_loop = None
self.running = False
if self.env.websocket_host is not None and self.env.websocket_port is not None:
self.websocket = AdminWebSocket(self)
async def process_metrics(self):
while self.running:
data = self.metrics.to_json_and_reset({
'sessions': self.session_count(),
'height': self.db.db_height,
})
if self.websocket is not None:
self.websocket.send_message(data)
await asyncio.sleep(1)
# async def process_metrics(self):
# while self.running:
# data = self.metrics.to_json_and_reset({
# 'sessions': self.session_count(),
# 'height': self.db.db_height,
# })
# if self.websocket is not None:
# self.websocket.send_message(data)
# await asyncio.sleep(1)
async def start_other(self):
self.running = True
@ -838,13 +839,9 @@ class LBRYSessionManager(SessionManager):
)
if self.websocket is not None:
await self.websocket.start()
if self.env.track_metrics:
self.metrics_loop = asyncio.create_task(self.process_metrics())
async def stop_other(self):
self.running = False
if self.env.track_metrics:
self.metrics_loop.cancel()
if self.websocket is not None:
await self.websocket.stop()
self.query_executor.shutdown()
@ -887,6 +884,7 @@ class LBRYElectrumX(SessionBase):
'blockchain.transaction.get_height': cls.transaction_get_height,
'blockchain.claimtrie.search': cls.claimtrie_search,
'blockchain.claimtrie.resolve': cls.claimtrie_resolve,
'blockchain.claimtrie.getclaimsbyids': cls.claimtrie_getclaimsbyids,
'blockchain.block.get_server_height': cls.get_server_height,
'mempool.get_fee_histogram': cls.mempool_compact_histogram,
'blockchain.block.headers': cls.block_headers,
@ -916,7 +914,7 @@ class LBRYElectrumX(SessionBase):
self.protocol_string = None
self.daemon = self.session_mgr.daemon
self.bp: LBRYBlockProcessor = self.session_mgr.bp
self.db: LBRYLevelDB = self.bp.db
self.db: LevelDB = self.bp.db
@classmethod
def protocol_min_max_strings(cls):
@ -973,15 +971,15 @@ class LBRYElectrumX(SessionBase):
finally:
self.session_mgr.notifications_in_flight_metric.dec()
def get_metrics_or_placeholder_for_api(self, query_name):
""" Do not hold on to a reference to the metrics
returned by this method past an `await` or
you may be working with a stale metrics object.
"""
if self.env.track_metrics:
return self.session_mgr.metrics.for_api(query_name)
else:
return APICallMetrics(query_name)
# def get_metrics_or_placeholder_for_api(self, query_name):
# """ Do not hold on to a reference to the metrics
# returned by this method past an `await` or
# you may be working with a stale metrics object.
# """
# if self.env.track_metrics:
# # return self.session_mgr.metrics.for_api(query_name)
# else:
# return APICallMetrics(query_name)
async def run_in_executor(self, query_name, func, kwargs):
start = time.perf_counter()
@ -994,15 +992,9 @@ class LBRYElectrumX(SessionBase):
raise
except Exception:
log.exception("dear devs, please handle this exception better")
metrics = self.get_metrics_or_placeholder_for_api(query_name)
metrics.query_error(start, {})
self.session_mgr.db_error_metric.inc()
raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error')
else:
if self.env.track_metrics:
metrics = self.get_metrics_or_placeholder_for_api(query_name)
(result, metrics_data) = result
metrics.query_response(start, metrics_data)
return base64.b64encode(result).decode()
finally:
self.session_mgr.pending_query_metric.dec()
@ -1057,6 +1049,67 @@ class LBRYElectrumX(SessionBase):
return -1
return None
async def claimtrie_getclaimsbyids(self, *claim_ids):
claims = await self.batched_formatted_claims_from_daemon(claim_ids)
return dict(zip(claim_ids, claims))
async def batched_formatted_claims_from_daemon(self, claim_ids):
claims = await self.daemon.getclaimsbyids(claim_ids)
result = []
for claim in claims:
if claim and claim.get('value'):
result.append(self.format_claim_from_daemon(claim))
return result
def format_claim_from_daemon(self, claim, name=None):
"""Changes the returned claim data to the format expected by lbry and adds missing fields."""
if not claim:
return {}
# this ISO-8859 nonsense stems from a nasty form of encoding extended characters in lbrycrd
# it will be fixed after the lbrycrd upstream merge to v17 is done
# it originated as a fear of terminals not supporting unicode. alas, they all do
if 'name' in claim:
name = claim['name'].encode('ISO-8859-1').decode()
info = self.db.sql.get_claims(claim_id=claim['claimId'])
if not info:
# raise RPCError("Lbrycrd has {} but not lbryumx, please submit a bug report.".format(claim_id))
return {}
address = info.address.decode()
# fixme: temporary
#supports = self.format_supports_from_daemon(claim.get('supports', []))
supports = []
amount = get_from_possible_keys(claim, 'amount', 'nAmount')
height = get_from_possible_keys(claim, 'height', 'nHeight')
effective_amount = get_from_possible_keys(claim, 'effective amount', 'nEffectiveAmount')
valid_at_height = get_from_possible_keys(claim, 'valid at height', 'nValidAtHeight')
result = {
"name": name,
"claim_id": claim['claimId'],
"txid": claim['txid'],
"nout": claim['n'],
"amount": amount,
"depth": self.db.db_height - height + 1,
"height": height,
"value": hexlify(claim['value'].encode('ISO-8859-1')).decode(),
"address": address, # from index
"supports": supports,
"effective_amount": effective_amount,
"valid_at_height": valid_at_height
}
if 'claim_sequence' in claim:
# TODO: ensure that lbrycrd #209 fills in this value
result['claim_sequence'] = claim['claim_sequence']
else:
result['claim_sequence'] = -1
if 'normalized_name' in claim:
result['normalized_name'] = claim['normalized_name'].encode('ISO-8859-1').decode()
return result
def assert_tx_hash(self, value):
'''Raise an RPCError if the value is not a valid transaction
hash.'''