sleeps
This commit is contained in:
parent
e6d470f110
commit
50e00192e8
4 changed files with 61 additions and 63 deletions
|
@ -911,14 +911,17 @@ class LevelDB:
|
||||||
def get_block_txs(self, height: int) -> List[bytes]:
|
def get_block_txs(self, height: int) -> List[bytes]:
|
||||||
return self.prefix_db.block_txs.get(height).tx_hashes
|
return self.prefix_db.block_txs.get(height).tx_hashes
|
||||||
|
|
||||||
def _fs_transactions(self, txids: Iterable[str]):
|
async def get_transactions_and_merkles(self, tx_hashes: Iterable[str]):
|
||||||
tx_counts = self.tx_counts
|
|
||||||
tx_db_get = self.prefix_db.tx.get
|
|
||||||
tx_cache = self._tx_and_merkle_cache
|
|
||||||
tx_infos = {}
|
tx_infos = {}
|
||||||
|
for tx_hash in tx_hashes:
|
||||||
|
tx_infos[tx_hash] = await asyncio.get_event_loop().run_in_executor(
|
||||||
|
None, self._get_transaction_and_merkle, tx_hash
|
||||||
|
)
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
return tx_infos
|
||||||
|
|
||||||
for tx_hash in txids:
|
def _get_transaction_and_merkle(self, tx_hash):
|
||||||
cached_tx = tx_cache.get(tx_hash)
|
cached_tx = self._tx_and_merkle_cache.get(tx_hash)
|
||||||
if cached_tx:
|
if cached_tx:
|
||||||
tx, merkle = cached_tx
|
tx, merkle = cached_tx
|
||||||
else:
|
else:
|
||||||
|
@ -929,14 +932,14 @@ class LevelDB:
|
||||||
tx_num = None if not tx_num else tx_num.tx_num
|
tx_num = None if not tx_num else tx_num.tx_num
|
||||||
if tx_num is not None:
|
if tx_num is not None:
|
||||||
fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0
|
fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0
|
||||||
tx_height = bisect_right(tx_counts, tx_num)
|
tx_height = bisect_right(self.tx_counts, tx_num)
|
||||||
tx = tx_db_get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False)
|
tx = self.prefix_db.tx.get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False)
|
||||||
if tx_height == -1:
|
if tx_height == -1:
|
||||||
merkle = {
|
merkle = {
|
||||||
'block_height': -1
|
'block_height': -1
|
||||||
}
|
}
|
||||||
else:
|
else:
|
||||||
tx_pos = tx_num - tx_counts[tx_height - 1]
|
tx_pos = tx_num - self.tx_counts[tx_height - 1]
|
||||||
branch, root = self.merkle.branch_and_root(
|
branch, root = self.merkle.branch_and_root(
|
||||||
self.get_block_txs(tx_height), tx_pos
|
self.get_block_txs(tx_height), tx_pos
|
||||||
)
|
)
|
||||||
|
@ -949,12 +952,8 @@ class LevelDB:
|
||||||
'pos': tx_pos
|
'pos': tx_pos
|
||||||
}
|
}
|
||||||
if tx_height + 10 < self.db_height:
|
if tx_height + 10 < self.db_height:
|
||||||
tx_cache[tx_hash] = tx, merkle
|
self._tx_and_merkle_cache[tx_hash] = tx, merkle
|
||||||
tx_infos[tx_hash] = (None if not tx else tx.hex(), merkle)
|
return (None if not tx else tx.hex(), merkle)
|
||||||
return tx_infos
|
|
||||||
|
|
||||||
async def fs_transactions(self, txids):
|
|
||||||
return await asyncio.get_event_loop().run_in_executor(None, self._fs_transactions, txids)
|
|
||||||
|
|
||||||
async def fs_block_hashes(self, height, count):
|
async def fs_block_hashes(self, height, count):
|
||||||
if height + count > len(self.headers):
|
if height + count > len(self.headers):
|
||||||
|
|
|
@ -548,6 +548,10 @@ class SessionManager:
|
||||||
self._clear_stale_sessions(),
|
self._clear_stale_sessions(),
|
||||||
self._manage_servers()
|
self._manage_servers()
|
||||||
])
|
])
|
||||||
|
except Exception as err:
|
||||||
|
if not isinstance(err, asyncio.CancelledError):
|
||||||
|
log.exception("hub server died")
|
||||||
|
raise err
|
||||||
finally:
|
finally:
|
||||||
await self._close_servers(list(self.servers.keys()))
|
await self._close_servers(list(self.servers.keys()))
|
||||||
log.warning("disconnect %i sessions", len(self.sessions))
|
log.warning("disconnect %i sessions", len(self.sessions))
|
||||||
|
@ -1015,16 +1019,16 @@ class LBRYElectrumX(SessionBase):
|
||||||
self.bp.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
|
self.bp.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
|
||||||
return self.bp.resolve_cache[url]
|
return self.bp.resolve_cache[url]
|
||||||
|
|
||||||
async def claimtrie_resolve(self, *urls):
|
async def claimtrie_resolve(self, *urls) -> str:
|
||||||
sorted_urls = tuple(sorted(urls))
|
sorted_urls = tuple(sorted(urls))
|
||||||
self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls))
|
self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls))
|
||||||
|
try:
|
||||||
def _cached_resolve():
|
if sorted_urls in self.bp.resolve_outputs_cache:
|
||||||
|
return self.bp.resolve_outputs_cache[sorted_urls]
|
||||||
rows, extra = [], []
|
rows, extra = [], []
|
||||||
|
|
||||||
for url in urls:
|
for url in urls:
|
||||||
if url not in self.bp.resolve_cache:
|
if url not in self.bp.resolve_cache:
|
||||||
self.bp.resolve_cache[url] = self.db._resolve(url)
|
self.bp.resolve_cache[url] = await self._cached_resolve_url(url)
|
||||||
stream, channel, repost, reposted_channel = self.bp.resolve_cache[url]
|
stream, channel, repost, reposted_channel = self.bp.resolve_cache[url]
|
||||||
if isinstance(channel, ResolveCensoredError):
|
if isinstance(channel, ResolveCensoredError):
|
||||||
rows.append(channel)
|
rows.append(channel)
|
||||||
|
@ -1049,18 +1053,11 @@ class LBRYElectrumX(SessionBase):
|
||||||
extra.append(repost)
|
extra.append(repost)
|
||||||
if reposted_channel:
|
if reposted_channel:
|
||||||
extra.append(reposted_channel)
|
extra.append(reposted_channel)
|
||||||
# print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra)))
|
await asyncio.sleep(0)
|
||||||
self.bp.resolve_outputs_cache[sorted_urls] = serialized_outputs = Outputs.to_base64(
|
self.bp.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor(
|
||||||
rows, extra, 0, None, None
|
None, Outputs.to_base64, rows, extra, 0, None, None
|
||||||
)
|
)
|
||||||
return serialized_outputs
|
return result
|
||||||
|
|
||||||
try:
|
|
||||||
if sorted_urls in self.bp.resolve_outputs_cache:
|
|
||||||
return self.bp.resolve_outputs_cache[sorted_urls]
|
|
||||||
else:
|
|
||||||
|
|
||||||
return await self.loop.run_in_executor(None, _cached_resolve)
|
|
||||||
finally:
|
finally:
|
||||||
self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls))
|
self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls))
|
||||||
|
|
||||||
|
@ -1213,9 +1210,11 @@ class LBRYElectrumX(SessionBase):
|
||||||
address: the address to subscribe to"""
|
address: the address to subscribe to"""
|
||||||
if len(addresses) > 1000:
|
if len(addresses) > 1000:
|
||||||
raise RPCError(BAD_REQUEST, f'too many addresses in subscription request: {len(addresses)}')
|
raise RPCError(BAD_REQUEST, f'too many addresses in subscription request: {len(addresses)}')
|
||||||
return [
|
results = []
|
||||||
await self.hashX_subscribe(self.address_to_hashX(address), address) for address in addresses
|
for address in addresses:
|
||||||
]
|
results.append(await self.hashX_subscribe(self.address_to_hashX(address), address))
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
return results
|
||||||
|
|
||||||
async def address_unsubscribe(self, address):
|
async def address_unsubscribe(self, address):
|
||||||
"""Unsubscribe an address.
|
"""Unsubscribe an address.
|
||||||
|
@ -1464,7 +1463,7 @@ class LBRYElectrumX(SessionBase):
|
||||||
raise RPCError(BAD_REQUEST, f'too many tx hashes in request: {len(tx_hashes)}')
|
raise RPCError(BAD_REQUEST, f'too many tx hashes in request: {len(tx_hashes)}')
|
||||||
for tx_hash in tx_hashes:
|
for tx_hash in tx_hashes:
|
||||||
assert_tx_hash(tx_hash)
|
assert_tx_hash(tx_hash)
|
||||||
batch_result = await self.db.fs_transactions(tx_hashes)
|
batch_result = await self.db.get_transactions_and_merkles(tx_hashes)
|
||||||
needed_merkles = {}
|
needed_merkles = {}
|
||||||
|
|
||||||
for tx_hash in tx_hashes:
|
for tx_hash in tx_hashes:
|
||||||
|
|
|
@ -23,7 +23,7 @@ class BlockchainReorganizationTests(CommandTestCase):
|
||||||
self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex())
|
self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex())
|
||||||
|
|
||||||
txids = await asyncio.get_event_loop().run_in_executor(None, get_txids)
|
txids = await asyncio.get_event_loop().run_in_executor(None, get_txids)
|
||||||
txs = await bp.db.fs_transactions(txids)
|
txs = await bp.db.get_transactions_and_merkles(txids)
|
||||||
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
|
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
|
||||||
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
|
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
|
||||||
self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')
|
self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')
|
||||||
|
|
|
@ -1458,7 +1458,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
|
||||||
txids = [
|
txids = [
|
||||||
tx_hash[::-1].hex() for tx_hash in bp.db.get_block_txs(height)
|
tx_hash[::-1].hex() for tx_hash in bp.db.get_block_txs(height)
|
||||||
]
|
]
|
||||||
txs = await bp.db.fs_transactions(txids)
|
txs = await bp.db.get_transactions_and_merkles(txids)
|
||||||
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
|
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
|
||||||
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
|
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
|
||||||
self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')
|
self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')
|
||||||
|
|
Loading…
Add table
Reference in a new issue