This commit is contained in:
Jack Robison 2021-10-14 18:57:46 -04:00
parent 99df418f1d
commit 7ea1a2b361
4 changed files with 61 additions and 63 deletions

View file

@ -911,50 +911,49 @@ class LevelDB:
def get_block_txs(self, height: int) -> List[bytes]:
return self.prefix_db.block_txs.get(height).tx_hashes
def _fs_transactions(self, txids: Iterable[str]):
tx_counts = self.tx_counts
tx_db_get = self.prefix_db.tx.get
tx_cache = self._tx_and_merkle_cache
async def get_transactions_and_merkles(self, tx_hashes: Iterable[str]):
tx_infos = {}
for tx_hash in txids:
cached_tx = tx_cache.get(tx_hash)
if cached_tx:
tx, merkle = cached_tx
else:
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
tx_num = self.prefix_db.tx_num.get(tx_hash_bytes)
tx = None
tx_height = -1
tx_num = None if not tx_num else tx_num.tx_num
if tx_num is not None:
fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0
tx_height = bisect_right(tx_counts, tx_num)
tx = tx_db_get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False)
if tx_height == -1:
merkle = {
'block_height': -1
}
else:
tx_pos = tx_num - tx_counts[tx_height - 1]
branch, root = self.merkle.branch_and_root(
self.get_block_txs(tx_height), tx_pos
)
merkle = {
'block_height': tx_height,
'merkle': [
hash_to_hex_str(hash)
for hash in branch
],
'pos': tx_pos
}
if tx_height + 10 < self.db_height:
tx_cache[tx_hash] = tx, merkle
tx_infos[tx_hash] = (None if not tx else tx.hex(), merkle)
for tx_hash in tx_hashes:
tx_infos[tx_hash] = await asyncio.get_event_loop().run_in_executor(
None, self._get_transaction_and_merkle, tx_hash
)
await asyncio.sleep(0)
return tx_infos
async def fs_transactions(self, txids):
return await asyncio.get_event_loop().run_in_executor(None, self._fs_transactions, txids)
def _get_transaction_and_merkle(self, tx_hash):
cached_tx = self._tx_and_merkle_cache.get(tx_hash)
if cached_tx:
tx, merkle = cached_tx
else:
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
tx_num = self.prefix_db.tx_num.get(tx_hash_bytes)
tx = None
tx_height = -1
tx_num = None if not tx_num else tx_num.tx_num
if tx_num is not None:
fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0
tx_height = bisect_right(self.tx_counts, tx_num)
tx = self.prefix_db.tx.get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False)
if tx_height == -1:
merkle = {
'block_height': -1
}
else:
tx_pos = tx_num - self.tx_counts[tx_height - 1]
branch, root = self.merkle.branch_and_root(
self.get_block_txs(tx_height), tx_pos
)
merkle = {
'block_height': tx_height,
'merkle': [
hash_to_hex_str(hash)
for hash in branch
],
'pos': tx_pos
}
if tx_height + 10 < self.db_height:
self._tx_and_merkle_cache[tx_hash] = tx, merkle
return (None if not tx else tx.hex(), merkle)
async def fs_block_hashes(self, height, count):
if height + count > len(self.headers):

View file

@ -548,6 +548,10 @@ class SessionManager:
self._clear_stale_sessions(),
self._manage_servers()
])
except Exception as err:
if not isinstance(err, asyncio.CancelledError):
log.exception("hub server died")
raise err
finally:
await self._close_servers(list(self.servers.keys()))
log.warning("disconnect %i sessions", len(self.sessions))
@ -1015,16 +1019,16 @@ class LBRYElectrumX(SessionBase):
self.bp.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
return self.bp.resolve_cache[url]
async def claimtrie_resolve(self, *urls):
async def claimtrie_resolve(self, *urls) -> str:
sorted_urls = tuple(sorted(urls))
self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls))
def _cached_resolve():
try:
if sorted_urls in self.bp.resolve_outputs_cache:
return self.bp.resolve_outputs_cache[sorted_urls]
rows, extra = [], []
for url in urls:
if url not in self.bp.resolve_cache:
self.bp.resolve_cache[url] = self.db._resolve(url)
self.bp.resolve_cache[url] = await self._cached_resolve_url(url)
stream, channel, repost, reposted_channel = self.bp.resolve_cache[url]
if isinstance(channel, ResolveCensoredError):
rows.append(channel)
@ -1049,18 +1053,11 @@ class LBRYElectrumX(SessionBase):
extra.append(repost)
if reposted_channel:
extra.append(reposted_channel)
# print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra)))
self.bp.resolve_outputs_cache[sorted_urls] = serialized_outputs = Outputs.to_base64(
rows, extra, 0, None, None
await asyncio.sleep(0)
self.bp.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor(
None, Outputs.to_base64, rows, extra, 0, None, None
)
return serialized_outputs
try:
if sorted_urls in self.bp.resolve_outputs_cache:
return self.bp.resolve_outputs_cache[sorted_urls]
else:
return await self.loop.run_in_executor(None, _cached_resolve)
return result
finally:
self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls))
@ -1213,9 +1210,11 @@ class LBRYElectrumX(SessionBase):
address: the address to subscribe to"""
if len(addresses) > 1000:
raise RPCError(BAD_REQUEST, f'too many addresses in subscription request: {len(addresses)}')
return [
await self.hashX_subscribe(self.address_to_hashX(address), address) for address in addresses
]
results = []
for address in addresses:
results.append(await self.hashX_subscribe(self.address_to_hashX(address), address))
await asyncio.sleep(0)
return results
async def address_unsubscribe(self, address):
"""Unsubscribe an address.
@ -1464,7 +1463,7 @@ class LBRYElectrumX(SessionBase):
raise RPCError(BAD_REQUEST, f'too many tx hashes in request: {len(tx_hashes)}')
for tx_hash in tx_hashes:
assert_tx_hash(tx_hash)
batch_result = await self.db.fs_transactions(tx_hashes)
batch_result = await self.db.get_transactions_and_merkles(tx_hashes)
needed_merkles = {}
for tx_hash in tx_hashes:

View file

@ -23,7 +23,7 @@ class BlockchainReorganizationTests(CommandTestCase):
self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex())
txids = await asyncio.get_event_loop().run_in_executor(None, get_txids)
txs = await bp.db.fs_transactions(txids)
txs = await bp.db.get_transactions_and_merkles(txids)
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')

View file

@ -1458,7 +1458,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
txids = [
tx_hash[::-1].hex() for tx_hash in bp.db.get_block_txs(height)
]
txs = await bp.db.fs_transactions(txids)
txs = await bp.db.get_transactions_and_merkles(txids)
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')