diff --git a/scribe/db/db.py b/scribe/db/db.py index a8ee6e2..0d0720a 100644 --- a/scribe/db/db.py +++ b/scribe/db/db.py @@ -853,6 +853,15 @@ class HubDB: return self.total_transactions[tx_num] return self.prefix_db.tx_hash.get(tx_num, deserialize_value=False) + def get_raw_mempool_tx(self, tx_hash: bytes) -> Optional[bytes]: + return self.prefix_db.mempool_tx.get(tx_hash, deserialize_value=False) + + def get_raw_confirmed_tx(self, tx_hash: bytes) -> Optional[bytes]: + return self.prefix_db.tx.get(tx_hash, deserialize_value=False) + + def get_raw_tx(self, tx_hash: bytes) -> Optional[bytes]: + return self.get_raw_mempool_tx(tx_hash) or self.get_raw_confirmed_tx(tx_hash) + def get_tx_num(self, tx_hash: bytes) -> int: if self._cache_all_tx_hashes: return self.tx_num_mapping[tx_hash] @@ -973,13 +982,14 @@ class HubDB: if self._cache_all_claim_txos: fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0 else: - fill_cache = False + fill_cache = True tx_height = bisect_right(self.tx_counts, tx_num) tx = self.prefix_db.tx.get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False) if tx_height == -1: merkle = { 'block_height': -1 } + tx = self.prefix_db.mempool_tx.get(tx_hash_bytes, deserialize_value=False) else: tx_pos = tx_num - self.tx_counts[tx_height - 1] branch, root = self.merkle.branch_and_root( diff --git a/scribe/hub/mempool.py b/scribe/hub/mempool.py index d556eea..c31f47e 100644 --- a/scribe/hub/mempool.py +++ b/scribe/hub/mempool.py @@ -52,6 +52,7 @@ class MemPool: self.refresh_secs = refresh_secs self.mempool_process_time_metric = mempool_process_time_metric self.session_manager: typing.Optional['SessionManager'] = None + self._notification_q = asyncio.Queue() def refresh(self) -> typing.Set[bytes]: # returns list of new touched hashXs prefix_db = self._db.prefix_db @@ -211,6 +212,20 @@ class MemPool: async def on_block(self, touched, height): await self._notify_sessions(height, touched, set()) + async def send_notifications_forever(self, started): + started.set() + while True: + (session_id, height_changed, hashXes) = await self._notification_q.get() + session = self.session_manager.sessions.get(session_id) + if session: + if session.subscribe_headers and height_changed: + asyncio.create_task( + session.send_notification('blockchain.headers.subscribe', + (self.session_manager.hsub_results[session.subscribe_headers_raw],)) + ) + if hashXes: + asyncio.create_task(session.send_history_notifications(*hashXes)) + async def _notify_sessions(self, height, touched, new_touched): """Notify sessions about height changes and touched addresses.""" height_changed = height != self.session_manager.notified_height @@ -221,18 +236,6 @@ class MemPool: return if height_changed: - notified = 0 - for session in self.session_manager.sessions.values(): - if session.subscribe_headers: - notified += 1 - asyncio.create_task( - session.send_notification('blockchain.headers.subscribe', - (self.session_manager.hsub_results[session.subscribe_headers_raw], )) - ) - if notified % 10 == 0: - await asyncio.sleep(0) # break up the loop somewhat, there can be many headers notifications - if notified: - self.logger.info(f'queued notify {notified} sessions of new header') for hashX in touched.intersection(self.session_manager.mempool_statuses.keys()): self.session_manager.mempool_statuses.pop(hashX, None) # self.bp._chain_executor @@ -240,9 +243,11 @@ class MemPool: self._db._executor, touched.intersection_update, self.session_manager.hashx_subscriptions_by_session.keys() ) + session_hashxes_to_notify = defaultdict(list) + notified_hashxs = 0 + sent_headers = 0 + if touched or new_touched or (height_changed and self.session_manager.mempool_statuses): - notified_hashxs = 0 - session_hashxes_to_notify = defaultdict(list) to_notify = touched if height_changed else new_touched for hashX in to_notify: @@ -251,7 +256,18 @@ class MemPool: for session_id in self.session_manager.hashx_subscriptions_by_session[hashX]: session_hashxes_to_notify[session_id].append(hashX) notified_hashxs += 1 - for session_id, hashXes in session_hashxes_to_notify.items(): - asyncio.create_task(self.session_manager.sessions[session_id].send_history_notifications(*hashXes)) - if session_hashxes_to_notify: - self.logger.info(f'notified {len(session_hashxes_to_notify)} sessions/{notified_hashxs:,d} touched addresses') + + for session_id, session in self.session_manager.sessions.items(): + hashXes = None + if session_id in session_hashxes_to_notify: + hashXes = session_hashxes_to_notify[session_id] + elif not session.subscribe_headers: + continue + if session.subscribe_headers and height_changed: + sent_headers += 1 + self._notification_q.put_nowait((session_id, height_changed, hashXes)) + + if sent_headers: + self.logger.info(f'notified {sent_headers} sessions of new block header') + if session_hashxes_to_notify: + self.logger.info(f'notified {len(session_hashxes_to_notify)} sessions/{notified_hashxs:,d} touched addresses') diff --git a/scribe/hub/service.py b/scribe/hub/service.py index c459a05..cb7fcd7 100644 --- a/scribe/hub/service.py +++ b/scribe/hub/service.py @@ -94,6 +94,7 @@ class HubServerService(BlockchainReaderService): def _iter_start_tasks(self): yield self.start_status_server() yield self.start_cancellable(self.es_notification_client.maintain_connection) + yield self.start_cancellable(self.mempool.send_notifications_forever) yield self.start_cancellable(self.refresh_blocks_forever) yield self.finished_initial_catch_up.wait() self.block_count_metric.set(self.last_state.height) diff --git a/scribe/hub/session.py b/scribe/hub/session.py index f539eb2..da78f9b 100644 --- a/scribe/hub/session.py +++ b/scribe/hub/session.py @@ -1666,53 +1666,23 @@ class LBRYElectrumX(asyncio.Protocol): for tx_hash in tx_hashes: assert_tx_hash(tx_hash) batch_result = await self.db.get_transactions_and_merkles(tx_hashes) - needed_merkles = {} - - for tx_hash in tx_hashes: - if tx_hash in batch_result and batch_result[tx_hash][0]: - continue - tx_hash_bytes = bytes.fromhex(tx_hash)[::-1] - mempool_tx = self.mempool.txs.get(tx_hash_bytes, None) - if mempool_tx: - raw_tx, block_hash = mempool_tx.raw_tx.hex(), None - else: - tx_info = await self.daemon_request('getrawtransaction', tx_hash, 1) - raw_tx = tx_info['hex'] - block_hash = tx_info.get('blockhash') - if block_hash: - block = await self.daemon.deserialised_block(block_hash) - height = block['height'] - try: - pos = block['tx'].index(tx_hash) - except ValueError: - raise RPCError(BAD_REQUEST, f'tx hash {tx_hash} not in ' - f'block {block_hash} at height {height:,d}') - needed_merkles[tx_hash] = raw_tx, block['tx'], pos, height - else: - batch_result[tx_hash] = [raw_tx, {'block_height': -1}] - - if needed_merkles: - for tx_hash, (raw_tx, block_txs, pos, block_height) in needed_merkles.items(): - batch_result[tx_hash] = raw_tx, { - 'merkle': self._get_merkle_branch(block_txs, pos), - 'pos': pos, - 'block_height': block_height - } - await asyncio.sleep(0) # heavy call, give other tasks a chance self.session_manager.tx_replied_count_metric.inc(len(tx_hashes)) return batch_result - async def transaction_get(self, tx_hash, verbose=False): + async def transaction_get(self, txid: str, verbose=False): """Return the serialized raw transaction given its hash - tx_hash: the transaction hash as a hexadecimal string + txid: the transaction hash as a hexadecimal string verbose: passed on to the daemon """ - assert_tx_hash(tx_hash) + assert_tx_hash(txid) if verbose not in (True, False): raise RPCError(BAD_REQUEST, f'"verbose" must be a boolean') + tx_hash_bytes = bytes.fromhex(txid)[::-1] - return await self.daemon_request('getrawtransaction', tx_hash, int(verbose)) + raw_tx = await asyncio.get_event_loop().run_in_executor(None, self.db.get_raw_tx, tx_hash_bytes) + if raw_tx: + return raw_tx.hex() def _get_merkle_branch(self, tx_hashes, tx_pos): """Return a merkle branch to a transaction.