disable sqlite in block processor

This commit is contained in:
Jack Robison 2021-02-19 13:18:38 -05:00 committed by Victor Shyba
parent 3e826d0a5d
commit ef3bab16d3
3 changed files with 89 additions and 36 deletions

View file

@ -802,8 +802,8 @@ class LBRYBlockProcessor(BlockProcessor):
self.prefetcher.polling_delay = 0.5 self.prefetcher.polling_delay = 0.5
self.should_validate_signatures = self.env.boolean('VALIDATE_CLAIM_SIGNATURES', False) self.should_validate_signatures = self.env.boolean('VALIDATE_CLAIM_SIGNATURES', False)
self.logger.info(f"LbryumX Block Processor - Validating signatures: {self.should_validate_signatures}") self.logger.info(f"LbryumX Block Processor - Validating signatures: {self.should_validate_signatures}")
self.sql: SQLDB = self.db.sql # self.sql: SQLDB = self.db.sql
self.timer = Timer('BlockProcessor') # self.timer = Timer('BlockProcessor')
def advance_blocks(self, blocks): def advance_blocks(self, blocks):
if self.sql: if self.sql:

View file

@ -14,7 +14,7 @@ from lbry.wallet.server.daemon import Daemon, LBCDaemon
from lbry.wallet.server.script import ScriptPubKey, OpCodes from lbry.wallet.server.script import ScriptPubKey, OpCodes
from lbry.wallet.server.leveldb import LevelDB from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.session import LBRYElectrumX, LBRYSessionManager from lbry.wallet.server.session import LBRYElectrumX, LBRYSessionManager
from lbry.wallet.server.db.writer import LBRYLevelDB # from lbry.wallet.server.db.writer import LBRYLevelDB
from lbry.wallet.server.block_processor import LBRYBlockProcessor from lbry.wallet.server.block_processor import LBRYBlockProcessor
@ -240,7 +240,7 @@ class LBC(Coin):
BLOCK_PROCESSOR = LBRYBlockProcessor BLOCK_PROCESSOR = LBRYBlockProcessor
SESSION_MANAGER = LBRYSessionManager SESSION_MANAGER = LBRYSessionManager
DESERIALIZER = DeserializerSegWit DESERIALIZER = DeserializerSegWit
DB = LBRYLevelDB DB = LevelDB
NAME = "LBRY" NAME = "LBRY"
SHORTNAME = "LBC" SHORTNAME = "LBC"
NET = "mainnet" NET = "mainnet"

View file

@ -23,8 +23,9 @@ from prometheus_client import Counter, Info, Histogram, Gauge
import lbry import lbry
from lbry.error import TooManyClaimSearchParametersError from lbry.error import TooManyClaimSearchParametersError
from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG
from lbry.schema.result import Outputs
from lbry.wallet.server.block_processor import LBRYBlockProcessor from lbry.wallet.server.block_processor import LBRYBlockProcessor
from lbry.wallet.server.db.writer import LBRYLevelDB from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.websocket import AdminWebSocket from lbry.wallet.server.websocket import AdminWebSocket
from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics
from lbry.wallet.rpc.framing import NewlineFramer from lbry.wallet.rpc.framing import NewlineFramer
@ -175,7 +176,7 @@ class SessionManager:
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
) )
def __init__(self, env: 'Env', db: LBRYLevelDB, bp: LBRYBlockProcessor, daemon: 'Daemon', mempool: 'MemPool', def __init__(self, env: 'Env', db: LevelDB, bp: LBRYBlockProcessor, daemon: 'Daemon', mempool: 'MemPool',
shutdown_event: asyncio.Event): shutdown_event: asyncio.Event):
env.max_send = max(350000, env.max_send) env.max_send = max(350000, env.max_send)
self.env = env self.env = env
@ -812,21 +813,21 @@ class LBRYSessionManager(SessionManager):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.query_executor = None self.query_executor = None
self.websocket = None self.websocket = None
self.metrics = ServerLoadData() # self.metrics = ServerLoadData()
self.metrics_loop = None self.metrics_loop = None
self.running = False self.running = False
if self.env.websocket_host is not None and self.env.websocket_port is not None: if self.env.websocket_host is not None and self.env.websocket_port is not None:
self.websocket = AdminWebSocket(self) self.websocket = AdminWebSocket(self)
async def process_metrics(self): # async def process_metrics(self):
while self.running: # while self.running:
data = self.metrics.to_json_and_reset({ # data = self.metrics.to_json_and_reset({
'sessions': self.session_count(), # 'sessions': self.session_count(),
'height': self.db.db_height, # 'height': self.db.db_height,
}) # })
if self.websocket is not None: # if self.websocket is not None:
self.websocket.send_message(data) # self.websocket.send_message(data)
await asyncio.sleep(1) # await asyncio.sleep(1)
async def start_other(self): async def start_other(self):
self.running = True self.running = True
@ -838,13 +839,9 @@ class LBRYSessionManager(SessionManager):
) )
if self.websocket is not None: if self.websocket is not None:
await self.websocket.start() await self.websocket.start()
if self.env.track_metrics:
self.metrics_loop = asyncio.create_task(self.process_metrics())
async def stop_other(self): async def stop_other(self):
self.running = False self.running = False
if self.env.track_metrics:
self.metrics_loop.cancel()
if self.websocket is not None: if self.websocket is not None:
await self.websocket.stop() await self.websocket.stop()
self.query_executor.shutdown() self.query_executor.shutdown()
@ -887,6 +884,7 @@ class LBRYElectrumX(SessionBase):
'blockchain.transaction.get_height': cls.transaction_get_height, 'blockchain.transaction.get_height': cls.transaction_get_height,
'blockchain.claimtrie.search': cls.claimtrie_search, 'blockchain.claimtrie.search': cls.claimtrie_search,
'blockchain.claimtrie.resolve': cls.claimtrie_resolve, 'blockchain.claimtrie.resolve': cls.claimtrie_resolve,
'blockchain.claimtrie.getclaimsbyids': cls.claimtrie_getclaimsbyids,
'blockchain.block.get_server_height': cls.get_server_height, 'blockchain.block.get_server_height': cls.get_server_height,
'mempool.get_fee_histogram': cls.mempool_compact_histogram, 'mempool.get_fee_histogram': cls.mempool_compact_histogram,
'blockchain.block.headers': cls.block_headers, 'blockchain.block.headers': cls.block_headers,
@ -916,7 +914,7 @@ class LBRYElectrumX(SessionBase):
self.protocol_string = None self.protocol_string = None
self.daemon = self.session_mgr.daemon self.daemon = self.session_mgr.daemon
self.bp: LBRYBlockProcessor = self.session_mgr.bp self.bp: LBRYBlockProcessor = self.session_mgr.bp
self.db: LBRYLevelDB = self.bp.db self.db: LevelDB = self.bp.db
@classmethod @classmethod
def protocol_min_max_strings(cls): def protocol_min_max_strings(cls):
@ -973,15 +971,15 @@ class LBRYElectrumX(SessionBase):
finally: finally:
self.session_mgr.notifications_in_flight_metric.dec() self.session_mgr.notifications_in_flight_metric.dec()
def get_metrics_or_placeholder_for_api(self, query_name): # def get_metrics_or_placeholder_for_api(self, query_name):
""" Do not hold on to a reference to the metrics # """ Do not hold on to a reference to the metrics
returned by this method past an `await` or # returned by this method past an `await` or
you may be working with a stale metrics object. # you may be working with a stale metrics object.
""" # """
if self.env.track_metrics: # if self.env.track_metrics:
return self.session_mgr.metrics.for_api(query_name) # # return self.session_mgr.metrics.for_api(query_name)
else: # else:
return APICallMetrics(query_name) # return APICallMetrics(query_name)
async def run_in_executor(self, query_name, func, kwargs): async def run_in_executor(self, query_name, func, kwargs):
start = time.perf_counter() start = time.perf_counter()
@ -994,15 +992,9 @@ class LBRYElectrumX(SessionBase):
raise raise
except Exception: except Exception:
log.exception("dear devs, please handle this exception better") log.exception("dear devs, please handle this exception better")
metrics = self.get_metrics_or_placeholder_for_api(query_name)
metrics.query_error(start, {})
self.session_mgr.db_error_metric.inc() self.session_mgr.db_error_metric.inc()
raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error') raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error')
else: else:
if self.env.track_metrics:
metrics = self.get_metrics_or_placeholder_for_api(query_name)
(result, metrics_data) = result
metrics.query_response(start, metrics_data)
return base64.b64encode(result).decode() return base64.b64encode(result).decode()
finally: finally:
self.session_mgr.pending_query_metric.dec() self.session_mgr.pending_query_metric.dec()
@ -1057,6 +1049,67 @@ class LBRYElectrumX(SessionBase):
return -1 return -1
return None return None
async def claimtrie_getclaimsbyids(self, *claim_ids):
claims = await self.batched_formatted_claims_from_daemon(claim_ids)
return dict(zip(claim_ids, claims))
async def batched_formatted_claims_from_daemon(self, claim_ids):
claims = await self.daemon.getclaimsbyids(claim_ids)
result = []
for claim in claims:
if claim and claim.get('value'):
result.append(self.format_claim_from_daemon(claim))
return result
def format_claim_from_daemon(self, claim, name=None):
"""Changes the returned claim data to the format expected by lbry and adds missing fields."""
if not claim:
return {}
# this ISO-8859 nonsense stems from a nasty form of encoding extended characters in lbrycrd
# it will be fixed after the lbrycrd upstream merge to v17 is done
# it originated as a fear of terminals not supporting unicode. alas, they all do
if 'name' in claim:
name = claim['name'].encode('ISO-8859-1').decode()
info = self.db.sql.get_claims(claim_id=claim['claimId'])
if not info:
# raise RPCError("Lbrycrd has {} but not lbryumx, please submit a bug report.".format(claim_id))
return {}
address = info.address.decode()
# fixme: temporary
#supports = self.format_supports_from_daemon(claim.get('supports', []))
supports = []
amount = get_from_possible_keys(claim, 'amount', 'nAmount')
height = get_from_possible_keys(claim, 'height', 'nHeight')
effective_amount = get_from_possible_keys(claim, 'effective amount', 'nEffectiveAmount')
valid_at_height = get_from_possible_keys(claim, 'valid at height', 'nValidAtHeight')
result = {
"name": name,
"claim_id": claim['claimId'],
"txid": claim['txid'],
"nout": claim['n'],
"amount": amount,
"depth": self.db.db_height - height + 1,
"height": height,
"value": hexlify(claim['value'].encode('ISO-8859-1')).decode(),
"address": address, # from index
"supports": supports,
"effective_amount": effective_amount,
"valid_at_height": valid_at_height
}
if 'claim_sequence' in claim:
# TODO: ensure that lbrycrd #209 fills in this value
result['claim_sequence'] = claim['claim_sequence']
else:
result['claim_sequence'] = -1
if 'normalized_name' in claim:
result['normalized_name'] = claim['normalized_name'].encode('ISO-8859-1').decode()
return result
def assert_tx_hash(self, value): def assert_tx_hash(self, value):
'''Raise an RPCError if the value is not a valid transaction '''Raise an RPCError if the value is not a valid transaction
hash.''' hash.'''