diff --git a/lbry/db/database.py b/lbry/db/database.py index 9e965095d..e3e95188f 100644 --- a/lbry/db/database.py +++ b/lbry/db/database.py @@ -260,8 +260,8 @@ class Database: async def get_missing_tx_for_addresses(self, address_manager): return await self.run(q.get_missing_tx_for_addresses, address_manager) - async def insert_block(self, block): - return await self.run(q.insert_block, block) + async def insert_blocks(self, blocks): + return await self.run(q.insert_blocks, blocks) async def insert_block_filter(self, height: int, factor: int, address_filter: bytes): return await self.run(q.insert_block_filter, height, factor, address_filter) @@ -330,6 +330,9 @@ class Database: q.generate_addresses_using_filters, best_height, allowed_gap, address_manager ) + async def get_raw_transactions(self, **constraints): + return await self.run(q.get_raw_transactions, **constraints) + async def get_transactions(self, **constraints) -> Result[Transaction]: return await self.fetch_result(q.get_transactions, **constraints) diff --git a/lbry/db/queries/base.py b/lbry/db/queries/base.py index cde19d2f5..d6a2a93e7 100644 --- a/lbry/db/queries/base.py +++ b/lbry/db/queries/base.py @@ -33,8 +33,11 @@ def get_best_block_height(): return context().fetchmax(Block.c.height, -1) -def insert_block(block): - context().get_bulk_loader().add_block(block).flush(return_row_count_for_table=None) +def insert_blocks(blocks): + loader = context().get_bulk_loader() + for block in blocks: + loader.add_block(block) + loader.flush(return_row_count_for_table=None) def get_block_headers(first, last=None): diff --git a/lbry/db/queries/txio.py b/lbry/db/queries/txio.py index 6a80e61b4..3eae78e03 100644 --- a/lbry/db/queries/txio.py +++ b/lbry/db/queries/txio.py @@ -277,9 +277,9 @@ def select_transactions(cols, account_ids=None, **constraints): TXO_NOT_MINE = Output(None, None, is_my_output=False) -def get_raw_transactions(tx_hashes): +def get_raw_transactions(tx_hash__in): return context().fetchall( - select(TX.c.tx_hash, TX.c.raw).where(TX.c.tx_hash.in_(tx_hashes)) + select(TX.c.tx_hash, TX.c.raw).where(TX.c.tx_hash.in_(tx_hash__in)) ) diff --git a/lbry/service/api.py b/lbry/service/api.py index 9f312c839..b12c44bf5 100644 --- a/lbry/service/api.py +++ b/lbry/service/api.py @@ -2635,6 +2635,7 @@ class API: async def transaction_search( self, txids: StrOrList, # transaction ids to find + raw: bool = False, # raw tx ) -> Dict[str, str]: """ Search for transaction(s) in the entire blockchain. @@ -2643,7 +2644,22 @@ class API: transaction_search ... """ - return await self.service.search_transactions(txids) + return await self.service.search_transactions(txids, raw) + + async def transaction_broadcast( + self, + tx: str, # transaction to broadcast + ) -> str: + """ + Broadcast transaction(s) to the blockchain. + + Usage: + transaction_broadcast ... + + """ + return await self.service.broadcast( + Transaction(unhexlify(tx)) + ) TXO_DOC = """ List and sum transaction outputs. @@ -3431,7 +3447,7 @@ class Client(API): async def connect(self): self.session = ClientSession() - self.ws = await self.session.ws_connect(self.url) + self.ws = await self.session.ws_connect(self.url, max_msg_size=20*1024*1024) self.receive_messages_task = asyncio.create_task(self.receive_messages()) async def disconnect(self): diff --git a/lbry/service/base.py b/lbry/service/base.py index 0af9228b8..09a162a83 100644 --- a/lbry/service/base.py +++ b/lbry/service/base.py @@ -149,7 +149,7 @@ class Service: # await self.ledger.maybe_verify_transaction(tx, height, merkle) # return tx - async def search_transactions(self, txids): + async def search_transactions(self, txids, raw: bool = False): raise NotImplementedError async def sum_supports( diff --git a/lbry/service/full_node.py b/lbry/service/full_node.py index ead0d13fb..7bd39f41e 100644 --- a/lbry/service/full_node.py +++ b/lbry/service/full_node.py @@ -53,10 +53,14 @@ class FullNode(Service): # for f in await self.db.get_block_address_filters() # } - async def search_transactions(self, txids): + async def search_transactions(self, txids, raw: bool = False): tx_hashes = [unhexlify(txid)[::-1] for txid in txids] + if raw: + return { + hexlify(tx['tx_hash'][::-1]).decode(): hexlify(tx['raw']).decode() + for tx in await self.db.get_raw_transactions(tx_hash__in=tx_hashes) + } return { - #hexlify(tx['tx_hash'][::-1]).decode(): hexlify(tx['raw']).decode() tx.id: hexlify(tx.raw).decode() for tx in await self.db.get_transactions(tx_hash__in=tx_hashes) } diff --git a/lbry/service/light_client.py b/lbry/service/light_client.py index 9c166b1c2..eb957cd30 100644 --- a/lbry/service/light_client.py +++ b/lbry/service/light_client.py @@ -40,8 +40,8 @@ class LightClient(Service): await super().stop() await self.client.disconnect() - async def search_transactions(self, txids): - return await self.client.transaction_search(txids=txids) + async def search_transactions(self, txids, raw: bool = False): + return await self.client.first.transaction_search(txids=txids, raw=raw) async def get_address_filters(self, start_height: int, end_height: int = None, granularity: int = 0): return await self.client.first.address_filter( @@ -49,7 +49,7 @@ class LightClient(Service): ) async def broadcast(self, tx): - pass + return await self.client.first.transaction_broadcast(tx=hexlify(tx.raw).decode()) async def wait(self, tx: Transaction, height=-1, timeout=1): pass @@ -82,33 +82,44 @@ class FilterManager: self.cache = {} async def download_and_save_filters(self, needed_filters): - for factor, start, end in needed_filters: - print(f'=> address_filter(granularity={factor}, start_height={start}, end_height={end})') - if factor > 3: - print('skipping') - continue - filters = await self.client.first.address_filter( - granularity=factor, start_height=start, end_height=end - ) - print(f'<= address_filter(granularity={factor}, start_height={start}, end_height={end})') + for factor, filter_start, filter_end in needed_filters: + print(f'loop, factor: {factor}, filter start: {filter_start}, filter end: {filter_end}') if factor == 0: + filters = await self.client.first.address_filter( + granularity=factor, start_height=filter_start, end_height=filter_end + ) + print(f'tx_filters: {len(filters)}') for tx_filter in filters: await self.db.insert_tx_filter( - unhexlify(tx_filter["txid"])[::-1], tx_filter["height"], unhexlify(tx_filter["filter"]) + unhexlify(tx_filter["txid"])[::-1], tx_filter["height"], + unhexlify(tx_filter["filter"]) ) else: - for block_filter in filters: - await self.db.insert_block_filter( - block_filter["height"], factor, unhexlify(block_filter["filter"]) + if factor > 1: + step = 10**factor + else: + step = 1 + for start in range(filter_start, filter_end+1, step): + print(f'=> address_filter(granularity={factor}, start_height={start})') + filters = await self.client.first.address_filter( + granularity=factor, start_height=start ) + print(f'<= address_filter(granularity={factor}, start_height={start})') + for block_filter in filters: + await self.db.insert_block_filter( + block_filter["height"], factor, unhexlify(block_filter["filter"]) + ) async def download_and_save_txs(self, tx_hashes): if not tx_hashes: return txids = [hexlify(tx_hash[::-1]).decode() for tx_hash in tx_hashes] - txs = await self.client.first.transaction_search(txids=txids) + print(f'=> transaction_search(len(txids): {len(txids)})') + txs = await self.client.first.transaction_search(txids=txids, raw=True) + print(f' @ transaction_search(len(txids): {len(txids)})') for raw_tx in txs.values(): await self.db.insert_transaction(None, Transaction(unhexlify(raw_tx))) + print(f' # transaction_search(len(txids): {len(txids)})') async def download_initial_filters(self, best_height): missing = await self.db.get_missing_required_filters(best_height) @@ -142,9 +153,11 @@ class FilterManager: for wallet in wallets: for account in wallet.accounts: for address_manager in account.address_managers.values(): + print(f'get_missing_tx_for_addresses({account.id})') missing = await self.db.get_missing_tx_for_addresses( (account.id, address_manager.chain_number) ) + print(f' len(missing): {len(missing)}') await self.download_and_save_txs(missing) async def download(self, best_height: int, wallets: WalletManager): @@ -206,13 +219,12 @@ class BlockHeaderManager: async def download(self, best_height): print('downloading blocks...') our_height = await self.db.get_best_block_height() - print(f'=> block_list(start_height={our_height+1}, end_height={best_height})') - blocks = await self.client.first.block_list(start_height=our_height+1, end_height=best_height) - print(f'<= block_list(start_height={our_height+1}, end_height={best_height})') - for block in blocks: - if block["height"] % 10000 == 0 or block["height"] < 2: - print(f'block {block["height"]}') - await self.db.insert_block(Block( + for start in range(our_height+1, best_height, 10000): + end = min(start+9999, best_height) + print(f'=> block_list(start_height={start}, end_height={end})') + blocks = await self.client.first.block_list(start_height=start, end_height=end) + print(f'<= block_list(start_height={start}, end_height={end})') + await self.db.insert_blocks([Block( height=block["height"], version=0, file_number=0, @@ -224,7 +236,7 @@ class BlockHeaderManager: bits=0, # block["bits"], nonce=0, # block["nonce"], txs=[] - )) + ) for block in blocks]) async def get_header(self, height): blocks = await self.client.first.block_list(height=height) diff --git a/lbry/testcase.py b/lbry/testcase.py index 1750c1e6e..b407bbde8 100644 --- a/lbry/testcase.py +++ b/lbry/testcase.py @@ -255,7 +255,7 @@ class AsyncUnitDBTestCase(AsyncioTestCase): async def add(self, block_or_tx: Union[Block, Transaction], block_hash: Optional[bytes] = None): if isinstance(block_or_tx, Block): - await self.db.insert_block(block_or_tx) + await self.db.insert_blocks([block_or_tx]) for tx in block_or_tx.txs: self.outputs.extend(tx.outputs) return block_or_tx