remove scribe-hub getrawtransaction calls, improve notifications

This commit is contained in:
Jack Robison 2022-04-04 10:36:30 -04:00
parent 32a2e92993
commit 4936bd43b6
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 53 additions and 56 deletions

View file

@ -853,6 +853,15 @@ class HubDB:
return self.total_transactions[tx_num]
return self.prefix_db.tx_hash.get(tx_num, deserialize_value=False)
def get_raw_mempool_tx(self, tx_hash: bytes) -> Optional[bytes]:
return self.prefix_db.mempool_tx.get(tx_hash, deserialize_value=False)
def get_raw_confirmed_tx(self, tx_hash: bytes) -> Optional[bytes]:
return self.prefix_db.tx.get(tx_hash, deserialize_value=False)
def get_raw_tx(self, tx_hash: bytes) -> Optional[bytes]:
return self.get_raw_mempool_tx(tx_hash) or self.get_raw_confirmed_tx(tx_hash)
def get_tx_num(self, tx_hash: bytes) -> int:
if self._cache_all_tx_hashes:
return self.tx_num_mapping[tx_hash]
@ -973,13 +982,14 @@ class HubDB:
if self._cache_all_claim_txos:
fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0
else:
fill_cache = False
fill_cache = True
tx_height = bisect_right(self.tx_counts, tx_num)
tx = self.prefix_db.tx.get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False)
if tx_height == -1:
merkle = {
'block_height': -1
}
tx = self.prefix_db.mempool_tx.get(tx_hash_bytes, deserialize_value=False)
else:
tx_pos = tx_num - self.tx_counts[tx_height - 1]
branch, root = self.merkle.branch_and_root(

View file

@ -52,6 +52,7 @@ class MemPool:
self.refresh_secs = refresh_secs
self.mempool_process_time_metric = mempool_process_time_metric
self.session_manager: typing.Optional['SessionManager'] = None
self._notification_q = asyncio.Queue()
def refresh(self) -> typing.Set[bytes]: # returns list of new touched hashXs
prefix_db = self._db.prefix_db
@ -211,6 +212,20 @@ class MemPool:
async def on_block(self, touched, height):
await self._notify_sessions(height, touched, set())
async def send_notifications_forever(self, started):
started.set()
while True:
(session_id, height_changed, hashXes) = await self._notification_q.get()
session = self.session_manager.sessions.get(session_id)
if session:
if session.subscribe_headers and height_changed:
asyncio.create_task(
session.send_notification('blockchain.headers.subscribe',
(self.session_manager.hsub_results[session.subscribe_headers_raw],))
)
if hashXes:
asyncio.create_task(session.send_history_notifications(*hashXes))
async def _notify_sessions(self, height, touched, new_touched):
"""Notify sessions about height changes and touched addresses."""
height_changed = height != self.session_manager.notified_height
@ -221,18 +236,6 @@ class MemPool:
return
if height_changed:
notified = 0
for session in self.session_manager.sessions.values():
if session.subscribe_headers:
notified += 1
asyncio.create_task(
session.send_notification('blockchain.headers.subscribe',
(self.session_manager.hsub_results[session.subscribe_headers_raw], ))
)
if notified % 10 == 0:
await asyncio.sleep(0) # break up the loop somewhat, there can be many headers notifications
if notified:
self.logger.info(f'queued notify {notified} sessions of new header')
for hashX in touched.intersection(self.session_manager.mempool_statuses.keys()):
self.session_manager.mempool_statuses.pop(hashX, None)
# self.bp._chain_executor
@ -240,9 +243,11 @@ class MemPool:
self._db._executor, touched.intersection_update, self.session_manager.hashx_subscriptions_by_session.keys()
)
session_hashxes_to_notify = defaultdict(list)
notified_hashxs = 0
sent_headers = 0
if touched or new_touched or (height_changed and self.session_manager.mempool_statuses):
notified_hashxs = 0
session_hashxes_to_notify = defaultdict(list)
to_notify = touched if height_changed else new_touched
for hashX in to_notify:
@ -251,7 +256,18 @@ class MemPool:
for session_id in self.session_manager.hashx_subscriptions_by_session[hashX]:
session_hashxes_to_notify[session_id].append(hashX)
notified_hashxs += 1
for session_id, hashXes in session_hashxes_to_notify.items():
asyncio.create_task(self.session_manager.sessions[session_id].send_history_notifications(*hashXes))
if session_hashxes_to_notify:
self.logger.info(f'notified {len(session_hashxes_to_notify)} sessions/{notified_hashxs:,d} touched addresses')
for session_id, session in self.session_manager.sessions.items():
hashXes = None
if session_id in session_hashxes_to_notify:
hashXes = session_hashxes_to_notify[session_id]
elif not session.subscribe_headers:
continue
if session.subscribe_headers and height_changed:
sent_headers += 1
self._notification_q.put_nowait((session_id, height_changed, hashXes))
if sent_headers:
self.logger.info(f'notified {sent_headers} sessions of new block header')
if session_hashxes_to_notify:
self.logger.info(f'notified {len(session_hashxes_to_notify)} sessions/{notified_hashxs:,d} touched addresses')

View file

@ -94,6 +94,7 @@ class HubServerService(BlockchainReaderService):
def _iter_start_tasks(self):
yield self.start_status_server()
yield self.start_cancellable(self.es_notification_client.maintain_connection)
yield self.start_cancellable(self.mempool.send_notifications_forever)
yield self.start_cancellable(self.refresh_blocks_forever)
yield self.finished_initial_catch_up.wait()
self.block_count_metric.set(self.last_state.height)

View file

@ -1666,53 +1666,23 @@ class LBRYElectrumX(asyncio.Protocol):
for tx_hash in tx_hashes:
assert_tx_hash(tx_hash)
batch_result = await self.db.get_transactions_and_merkles(tx_hashes)
needed_merkles = {}
for tx_hash in tx_hashes:
if tx_hash in batch_result and batch_result[tx_hash][0]:
continue
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
mempool_tx = self.mempool.txs.get(tx_hash_bytes, None)
if mempool_tx:
raw_tx, block_hash = mempool_tx.raw_tx.hex(), None
else:
tx_info = await self.daemon_request('getrawtransaction', tx_hash, 1)
raw_tx = tx_info['hex']
block_hash = tx_info.get('blockhash')
if block_hash:
block = await self.daemon.deserialised_block(block_hash)
height = block['height']
try:
pos = block['tx'].index(tx_hash)
except ValueError:
raise RPCError(BAD_REQUEST, f'tx hash {tx_hash} not in '
f'block {block_hash} at height {height:,d}')
needed_merkles[tx_hash] = raw_tx, block['tx'], pos, height
else:
batch_result[tx_hash] = [raw_tx, {'block_height': -1}]
if needed_merkles:
for tx_hash, (raw_tx, block_txs, pos, block_height) in needed_merkles.items():
batch_result[tx_hash] = raw_tx, {
'merkle': self._get_merkle_branch(block_txs, pos),
'pos': pos,
'block_height': block_height
}
await asyncio.sleep(0) # heavy call, give other tasks a chance
self.session_manager.tx_replied_count_metric.inc(len(tx_hashes))
return batch_result
async def transaction_get(self, tx_hash, verbose=False):
async def transaction_get(self, txid: str, verbose=False):
"""Return the serialized raw transaction given its hash
tx_hash: the transaction hash as a hexadecimal string
txid: the transaction hash as a hexadecimal string
verbose: passed on to the daemon
"""
assert_tx_hash(tx_hash)
assert_tx_hash(txid)
if verbose not in (True, False):
raise RPCError(BAD_REQUEST, f'"verbose" must be a boolean')
tx_hash_bytes = bytes.fromhex(txid)[::-1]
return await self.daemon_request('getrawtransaction', tx_hash, int(verbose))
raw_tx = await asyncio.get_event_loop().run_in_executor(None, self.db.get_raw_tx, tx_hash_bytes)
if raw_tx:
return raw_tx.hex()
def _get_merkle_branch(self, tx_hashes, tx_pos):
"""Return a merkle branch to a transaction.