forked from LBRYCommunity/lbry-sdk
sleeps
This commit is contained in:
parent
8bfff2d549
commit
b2de89ca29
4 changed files with 61 additions and 63 deletions
|
@ -911,50 +911,49 @@ class LevelDB:
|
|||
def get_block_txs(self, height: int) -> List[bytes]:
|
||||
return self.prefix_db.block_txs.get(height).tx_hashes
|
||||
|
||||
def _fs_transactions(self, txids: Iterable[str]):
|
||||
tx_counts = self.tx_counts
|
||||
tx_db_get = self.prefix_db.tx.get
|
||||
tx_cache = self._tx_and_merkle_cache
|
||||
async def get_transactions_and_merkles(self, tx_hashes: Iterable[str]):
|
||||
tx_infos = {}
|
||||
|
||||
for tx_hash in txids:
|
||||
cached_tx = tx_cache.get(tx_hash)
|
||||
if cached_tx:
|
||||
tx, merkle = cached_tx
|
||||
else:
|
||||
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
|
||||
tx_num = self.prefix_db.tx_num.get(tx_hash_bytes)
|
||||
tx = None
|
||||
tx_height = -1
|
||||
tx_num = None if not tx_num else tx_num.tx_num
|
||||
if tx_num is not None:
|
||||
fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0
|
||||
tx_height = bisect_right(tx_counts, tx_num)
|
||||
tx = tx_db_get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False)
|
||||
if tx_height == -1:
|
||||
merkle = {
|
||||
'block_height': -1
|
||||
}
|
||||
else:
|
||||
tx_pos = tx_num - tx_counts[tx_height - 1]
|
||||
branch, root = self.merkle.branch_and_root(
|
||||
self.get_block_txs(tx_height), tx_pos
|
||||
)
|
||||
merkle = {
|
||||
'block_height': tx_height,
|
||||
'merkle': [
|
||||
hash_to_hex_str(hash)
|
||||
for hash in branch
|
||||
],
|
||||
'pos': tx_pos
|
||||
}
|
||||
if tx_height + 10 < self.db_height:
|
||||
tx_cache[tx_hash] = tx, merkle
|
||||
tx_infos[tx_hash] = (None if not tx else tx.hex(), merkle)
|
||||
for tx_hash in tx_hashes:
|
||||
tx_infos[tx_hash] = await asyncio.get_event_loop().run_in_executor(
|
||||
None, self._get_transaction_and_merkle, tx_hash
|
||||
)
|
||||
await asyncio.sleep(0)
|
||||
return tx_infos
|
||||
|
||||
async def fs_transactions(self, txids):
|
||||
return await asyncio.get_event_loop().run_in_executor(None, self._fs_transactions, txids)
|
||||
def _get_transaction_and_merkle(self, tx_hash):
|
||||
cached_tx = self._tx_and_merkle_cache.get(tx_hash)
|
||||
if cached_tx:
|
||||
tx, merkle = cached_tx
|
||||
else:
|
||||
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
|
||||
tx_num = self.prefix_db.tx_num.get(tx_hash_bytes)
|
||||
tx = None
|
||||
tx_height = -1
|
||||
tx_num = None if not tx_num else tx_num.tx_num
|
||||
if tx_num is not None:
|
||||
fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0
|
||||
tx_height = bisect_right(self.tx_counts, tx_num)
|
||||
tx = self.prefix_db.tx.get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False)
|
||||
if tx_height == -1:
|
||||
merkle = {
|
||||
'block_height': -1
|
||||
}
|
||||
else:
|
||||
tx_pos = tx_num - self.tx_counts[tx_height - 1]
|
||||
branch, root = self.merkle.branch_and_root(
|
||||
self.get_block_txs(tx_height), tx_pos
|
||||
)
|
||||
merkle = {
|
||||
'block_height': tx_height,
|
||||
'merkle': [
|
||||
hash_to_hex_str(hash)
|
||||
for hash in branch
|
||||
],
|
||||
'pos': tx_pos
|
||||
}
|
||||
if tx_height + 10 < self.db_height:
|
||||
self._tx_and_merkle_cache[tx_hash] = tx, merkle
|
||||
return (None if not tx else tx.hex(), merkle)
|
||||
|
||||
async def fs_block_hashes(self, height, count):
|
||||
if height + count > len(self.headers):
|
||||
|
|
|
@ -548,6 +548,10 @@ class SessionManager:
|
|||
self._clear_stale_sessions(),
|
||||
self._manage_servers()
|
||||
])
|
||||
except Exception as err:
|
||||
if not isinstance(err, asyncio.CancelledError):
|
||||
log.exception("hub server died")
|
||||
raise err
|
||||
finally:
|
||||
await self._close_servers(list(self.servers.keys()))
|
||||
log.warning("disconnect %i sessions", len(self.sessions))
|
||||
|
@ -1015,16 +1019,16 @@ class LBRYElectrumX(SessionBase):
|
|||
self.bp.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
|
||||
return self.bp.resolve_cache[url]
|
||||
|
||||
async def claimtrie_resolve(self, *urls):
|
||||
async def claimtrie_resolve(self, *urls) -> str:
|
||||
sorted_urls = tuple(sorted(urls))
|
||||
self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls))
|
||||
|
||||
def _cached_resolve():
|
||||
try:
|
||||
if sorted_urls in self.bp.resolve_outputs_cache:
|
||||
return self.bp.resolve_outputs_cache[sorted_urls]
|
||||
rows, extra = [], []
|
||||
|
||||
for url in urls:
|
||||
if url not in self.bp.resolve_cache:
|
||||
self.bp.resolve_cache[url] = self.db._resolve(url)
|
||||
self.bp.resolve_cache[url] = await self._cached_resolve_url(url)
|
||||
stream, channel, repost, reposted_channel = self.bp.resolve_cache[url]
|
||||
if isinstance(channel, ResolveCensoredError):
|
||||
rows.append(channel)
|
||||
|
@ -1049,18 +1053,11 @@ class LBRYElectrumX(SessionBase):
|
|||
extra.append(repost)
|
||||
if reposted_channel:
|
||||
extra.append(reposted_channel)
|
||||
# print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra)))
|
||||
self.bp.resolve_outputs_cache[sorted_urls] = serialized_outputs = Outputs.to_base64(
|
||||
rows, extra, 0, None, None
|
||||
await asyncio.sleep(0)
|
||||
self.bp.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor(
|
||||
None, Outputs.to_base64, rows, extra, 0, None, None
|
||||
)
|
||||
return serialized_outputs
|
||||
|
||||
try:
|
||||
if sorted_urls in self.bp.resolve_outputs_cache:
|
||||
return self.bp.resolve_outputs_cache[sorted_urls]
|
||||
else:
|
||||
|
||||
return await self.loop.run_in_executor(None, _cached_resolve)
|
||||
return result
|
||||
finally:
|
||||
self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls))
|
||||
|
||||
|
@ -1213,9 +1210,11 @@ class LBRYElectrumX(SessionBase):
|
|||
address: the address to subscribe to"""
|
||||
if len(addresses) > 1000:
|
||||
raise RPCError(BAD_REQUEST, f'too many addresses in subscription request: {len(addresses)}')
|
||||
return [
|
||||
await self.hashX_subscribe(self.address_to_hashX(address), address) for address in addresses
|
||||
]
|
||||
results = []
|
||||
for address in addresses:
|
||||
results.append(await self.hashX_subscribe(self.address_to_hashX(address), address))
|
||||
await asyncio.sleep(0)
|
||||
return results
|
||||
|
||||
async def address_unsubscribe(self, address):
|
||||
"""Unsubscribe an address.
|
||||
|
@ -1464,7 +1463,7 @@ class LBRYElectrumX(SessionBase):
|
|||
raise RPCError(BAD_REQUEST, f'too many tx hashes in request: {len(tx_hashes)}')
|
||||
for tx_hash in tx_hashes:
|
||||
assert_tx_hash(tx_hash)
|
||||
batch_result = await self.db.fs_transactions(tx_hashes)
|
||||
batch_result = await self.db.get_transactions_and_merkles(tx_hashes)
|
||||
needed_merkles = {}
|
||||
|
||||
for tx_hash in tx_hashes:
|
||||
|
|
|
@ -23,7 +23,7 @@ class BlockchainReorganizationTests(CommandTestCase):
|
|||
self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex())
|
||||
|
||||
txids = await asyncio.get_event_loop().run_in_executor(None, get_txids)
|
||||
txs = await bp.db.fs_transactions(txids)
|
||||
txs = await bp.db.get_transactions_and_merkles(txids)
|
||||
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
|
||||
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
|
||||
self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')
|
||||
|
|
|
@ -1458,7 +1458,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
|
|||
txids = [
|
||||
tx_hash[::-1].hex() for tx_hash in bp.db.get_block_txs(height)
|
||||
]
|
||||
txs = await bp.db.fs_transactions(txids)
|
||||
txs = await bp.db.get_transactions_and_merkles(txids)
|
||||
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
|
||||
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
|
||||
self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')
|
||||
|
|
Loading…
Add table
Reference in a new issue