client sync fixes
This commit is contained in:
parent
d274145328
commit
0620cf78b2
8 changed files with 75 additions and 37 deletions
|
@ -260,8 +260,8 @@ class Database:
|
||||||
async def get_missing_tx_for_addresses(self, address_manager):
|
async def get_missing_tx_for_addresses(self, address_manager):
|
||||||
return await self.run(q.get_missing_tx_for_addresses, address_manager)
|
return await self.run(q.get_missing_tx_for_addresses, address_manager)
|
||||||
|
|
||||||
async def insert_block(self, block):
|
async def insert_blocks(self, blocks):
|
||||||
return await self.run(q.insert_block, block)
|
return await self.run(q.insert_blocks, blocks)
|
||||||
|
|
||||||
async def insert_block_filter(self, height: int, factor: int, address_filter: bytes):
|
async def insert_block_filter(self, height: int, factor: int, address_filter: bytes):
|
||||||
return await self.run(q.insert_block_filter, height, factor, address_filter)
|
return await self.run(q.insert_block_filter, height, factor, address_filter)
|
||||||
|
@ -330,6 +330,9 @@ class Database:
|
||||||
q.generate_addresses_using_filters, best_height, allowed_gap, address_manager
|
q.generate_addresses_using_filters, best_height, allowed_gap, address_manager
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def get_raw_transactions(self, **constraints):
|
||||||
|
return await self.run(q.get_raw_transactions, **constraints)
|
||||||
|
|
||||||
async def get_transactions(self, **constraints) -> Result[Transaction]:
|
async def get_transactions(self, **constraints) -> Result[Transaction]:
|
||||||
return await self.fetch_result(q.get_transactions, **constraints)
|
return await self.fetch_result(q.get_transactions, **constraints)
|
||||||
|
|
||||||
|
|
|
@ -33,8 +33,11 @@ def get_best_block_height():
|
||||||
return context().fetchmax(Block.c.height, -1)
|
return context().fetchmax(Block.c.height, -1)
|
||||||
|
|
||||||
|
|
||||||
def insert_block(block):
|
def insert_blocks(blocks):
|
||||||
context().get_bulk_loader().add_block(block).flush(return_row_count_for_table=None)
|
loader = context().get_bulk_loader()
|
||||||
|
for block in blocks:
|
||||||
|
loader.add_block(block)
|
||||||
|
loader.flush(return_row_count_for_table=None)
|
||||||
|
|
||||||
|
|
||||||
def get_block_headers(first, last=None):
|
def get_block_headers(first, last=None):
|
||||||
|
|
|
@ -277,9 +277,9 @@ def select_transactions(cols, account_ids=None, **constraints):
|
||||||
TXO_NOT_MINE = Output(None, None, is_my_output=False)
|
TXO_NOT_MINE = Output(None, None, is_my_output=False)
|
||||||
|
|
||||||
|
|
||||||
def get_raw_transactions(tx_hashes):
|
def get_raw_transactions(tx_hash__in):
|
||||||
return context().fetchall(
|
return context().fetchall(
|
||||||
select(TX.c.tx_hash, TX.c.raw).where(TX.c.tx_hash.in_(tx_hashes))
|
select(TX.c.tx_hash, TX.c.raw).where(TX.c.tx_hash.in_(tx_hash__in))
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -2635,6 +2635,7 @@ class API:
|
||||||
async def transaction_search(
|
async def transaction_search(
|
||||||
self,
|
self,
|
||||||
txids: StrOrList, # transaction ids to find
|
txids: StrOrList, # transaction ids to find
|
||||||
|
raw: bool = False, # raw tx
|
||||||
) -> Dict[str, str]:
|
) -> Dict[str, str]:
|
||||||
"""
|
"""
|
||||||
Search for transaction(s) in the entire blockchain.
|
Search for transaction(s) in the entire blockchain.
|
||||||
|
@ -2643,7 +2644,22 @@ class API:
|
||||||
transaction_search <txid>...
|
transaction_search <txid>...
|
||||||
|
|
||||||
"""
|
"""
|
||||||
return await self.service.search_transactions(txids)
|
return await self.service.search_transactions(txids, raw)
|
||||||
|
|
||||||
|
async def transaction_broadcast(
|
||||||
|
self,
|
||||||
|
tx: str, # transaction to broadcast
|
||||||
|
) -> str:
|
||||||
|
"""
|
||||||
|
Broadcast transaction(s) to the blockchain.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
transaction_broadcast <tx>...
|
||||||
|
|
||||||
|
"""
|
||||||
|
return await self.service.broadcast(
|
||||||
|
Transaction(unhexlify(tx))
|
||||||
|
)
|
||||||
|
|
||||||
TXO_DOC = """
|
TXO_DOC = """
|
||||||
List and sum transaction outputs.
|
List and sum transaction outputs.
|
||||||
|
@ -3431,7 +3447,7 @@ class Client(API):
|
||||||
|
|
||||||
async def connect(self):
|
async def connect(self):
|
||||||
self.session = ClientSession()
|
self.session = ClientSession()
|
||||||
self.ws = await self.session.ws_connect(self.url)
|
self.ws = await self.session.ws_connect(self.url, max_msg_size=20*1024*1024)
|
||||||
self.receive_messages_task = asyncio.create_task(self.receive_messages())
|
self.receive_messages_task = asyncio.create_task(self.receive_messages())
|
||||||
|
|
||||||
async def disconnect(self):
|
async def disconnect(self):
|
||||||
|
|
|
@ -149,7 +149,7 @@ class Service:
|
||||||
# await self.ledger.maybe_verify_transaction(tx, height, merkle)
|
# await self.ledger.maybe_verify_transaction(tx, height, merkle)
|
||||||
# return tx
|
# return tx
|
||||||
|
|
||||||
async def search_transactions(self, txids):
|
async def search_transactions(self, txids, raw: bool = False):
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
async def sum_supports(
|
async def sum_supports(
|
||||||
|
|
|
@ -53,10 +53,14 @@ class FullNode(Service):
|
||||||
# for f in await self.db.get_block_address_filters()
|
# for f in await self.db.get_block_address_filters()
|
||||||
# }
|
# }
|
||||||
|
|
||||||
async def search_transactions(self, txids):
|
async def search_transactions(self, txids, raw: bool = False):
|
||||||
tx_hashes = [unhexlify(txid)[::-1] for txid in txids]
|
tx_hashes = [unhexlify(txid)[::-1] for txid in txids]
|
||||||
|
if raw:
|
||||||
|
return {
|
||||||
|
hexlify(tx['tx_hash'][::-1]).decode(): hexlify(tx['raw']).decode()
|
||||||
|
for tx in await self.db.get_raw_transactions(tx_hash__in=tx_hashes)
|
||||||
|
}
|
||||||
return {
|
return {
|
||||||
#hexlify(tx['tx_hash'][::-1]).decode(): hexlify(tx['raw']).decode()
|
|
||||||
tx.id: hexlify(tx.raw).decode()
|
tx.id: hexlify(tx.raw).decode()
|
||||||
for tx in await self.db.get_transactions(tx_hash__in=tx_hashes)
|
for tx in await self.db.get_transactions(tx_hash__in=tx_hashes)
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,8 +40,8 @@ class LightClient(Service):
|
||||||
await super().stop()
|
await super().stop()
|
||||||
await self.client.disconnect()
|
await self.client.disconnect()
|
||||||
|
|
||||||
async def search_transactions(self, txids):
|
async def search_transactions(self, txids, raw: bool = False):
|
||||||
return await self.client.transaction_search(txids=txids)
|
return await self.client.first.transaction_search(txids=txids, raw=raw)
|
||||||
|
|
||||||
async def get_address_filters(self, start_height: int, end_height: int = None, granularity: int = 0):
|
async def get_address_filters(self, start_height: int, end_height: int = None, granularity: int = 0):
|
||||||
return await self.client.first.address_filter(
|
return await self.client.first.address_filter(
|
||||||
|
@ -49,7 +49,7 @@ class LightClient(Service):
|
||||||
)
|
)
|
||||||
|
|
||||||
async def broadcast(self, tx):
|
async def broadcast(self, tx):
|
||||||
pass
|
return await self.client.first.transaction_broadcast(tx=hexlify(tx.raw).decode())
|
||||||
|
|
||||||
async def wait(self, tx: Transaction, height=-1, timeout=1):
|
async def wait(self, tx: Transaction, height=-1, timeout=1):
|
||||||
pass
|
pass
|
||||||
|
@ -82,33 +82,44 @@ class FilterManager:
|
||||||
self.cache = {}
|
self.cache = {}
|
||||||
|
|
||||||
async def download_and_save_filters(self, needed_filters):
|
async def download_and_save_filters(self, needed_filters):
|
||||||
for factor, start, end in needed_filters:
|
for factor, filter_start, filter_end in needed_filters:
|
||||||
print(f'=> address_filter(granularity={factor}, start_height={start}, end_height={end})')
|
print(f'loop, factor: {factor}, filter start: {filter_start}, filter end: {filter_end}')
|
||||||
if factor > 3:
|
|
||||||
print('skipping')
|
|
||||||
continue
|
|
||||||
filters = await self.client.first.address_filter(
|
|
||||||
granularity=factor, start_height=start, end_height=end
|
|
||||||
)
|
|
||||||
print(f'<= address_filter(granularity={factor}, start_height={start}, end_height={end})')
|
|
||||||
if factor == 0:
|
if factor == 0:
|
||||||
|
filters = await self.client.first.address_filter(
|
||||||
|
granularity=factor, start_height=filter_start, end_height=filter_end
|
||||||
|
)
|
||||||
|
print(f'tx_filters: {len(filters)}')
|
||||||
for tx_filter in filters:
|
for tx_filter in filters:
|
||||||
await self.db.insert_tx_filter(
|
await self.db.insert_tx_filter(
|
||||||
unhexlify(tx_filter["txid"])[::-1], tx_filter["height"], unhexlify(tx_filter["filter"])
|
unhexlify(tx_filter["txid"])[::-1], tx_filter["height"],
|
||||||
|
unhexlify(tx_filter["filter"])
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
for block_filter in filters:
|
if factor > 1:
|
||||||
await self.db.insert_block_filter(
|
step = 10**factor
|
||||||
block_filter["height"], factor, unhexlify(block_filter["filter"])
|
else:
|
||||||
|
step = 1
|
||||||
|
for start in range(filter_start, filter_end+1, step):
|
||||||
|
print(f'=> address_filter(granularity={factor}, start_height={start})')
|
||||||
|
filters = await self.client.first.address_filter(
|
||||||
|
granularity=factor, start_height=start
|
||||||
)
|
)
|
||||||
|
print(f'<= address_filter(granularity={factor}, start_height={start})')
|
||||||
|
for block_filter in filters:
|
||||||
|
await self.db.insert_block_filter(
|
||||||
|
block_filter["height"], factor, unhexlify(block_filter["filter"])
|
||||||
|
)
|
||||||
|
|
||||||
async def download_and_save_txs(self, tx_hashes):
|
async def download_and_save_txs(self, tx_hashes):
|
||||||
if not tx_hashes:
|
if not tx_hashes:
|
||||||
return
|
return
|
||||||
txids = [hexlify(tx_hash[::-1]).decode() for tx_hash in tx_hashes]
|
txids = [hexlify(tx_hash[::-1]).decode() for tx_hash in tx_hashes]
|
||||||
txs = await self.client.first.transaction_search(txids=txids)
|
print(f'=> transaction_search(len(txids): {len(txids)})')
|
||||||
|
txs = await self.client.first.transaction_search(txids=txids, raw=True)
|
||||||
|
print(f' @ transaction_search(len(txids): {len(txids)})')
|
||||||
for raw_tx in txs.values():
|
for raw_tx in txs.values():
|
||||||
await self.db.insert_transaction(None, Transaction(unhexlify(raw_tx)))
|
await self.db.insert_transaction(None, Transaction(unhexlify(raw_tx)))
|
||||||
|
print(f' # transaction_search(len(txids): {len(txids)})')
|
||||||
|
|
||||||
async def download_initial_filters(self, best_height):
|
async def download_initial_filters(self, best_height):
|
||||||
missing = await self.db.get_missing_required_filters(best_height)
|
missing = await self.db.get_missing_required_filters(best_height)
|
||||||
|
@ -142,9 +153,11 @@ class FilterManager:
|
||||||
for wallet in wallets:
|
for wallet in wallets:
|
||||||
for account in wallet.accounts:
|
for account in wallet.accounts:
|
||||||
for address_manager in account.address_managers.values():
|
for address_manager in account.address_managers.values():
|
||||||
|
print(f'get_missing_tx_for_addresses({account.id})')
|
||||||
missing = await self.db.get_missing_tx_for_addresses(
|
missing = await self.db.get_missing_tx_for_addresses(
|
||||||
(account.id, address_manager.chain_number)
|
(account.id, address_manager.chain_number)
|
||||||
)
|
)
|
||||||
|
print(f' len(missing): {len(missing)}')
|
||||||
await self.download_and_save_txs(missing)
|
await self.download_and_save_txs(missing)
|
||||||
|
|
||||||
async def download(self, best_height: int, wallets: WalletManager):
|
async def download(self, best_height: int, wallets: WalletManager):
|
||||||
|
@ -206,13 +219,12 @@ class BlockHeaderManager:
|
||||||
async def download(self, best_height):
|
async def download(self, best_height):
|
||||||
print('downloading blocks...')
|
print('downloading blocks...')
|
||||||
our_height = await self.db.get_best_block_height()
|
our_height = await self.db.get_best_block_height()
|
||||||
print(f'=> block_list(start_height={our_height+1}, end_height={best_height})')
|
for start in range(our_height+1, best_height, 10000):
|
||||||
blocks = await self.client.first.block_list(start_height=our_height+1, end_height=best_height)
|
end = min(start+9999, best_height)
|
||||||
print(f'<= block_list(start_height={our_height+1}, end_height={best_height})')
|
print(f'=> block_list(start_height={start}, end_height={end})')
|
||||||
for block in blocks:
|
blocks = await self.client.first.block_list(start_height=start, end_height=end)
|
||||||
if block["height"] % 10000 == 0 or block["height"] < 2:
|
print(f'<= block_list(start_height={start}, end_height={end})')
|
||||||
print(f'block {block["height"]}')
|
await self.db.insert_blocks([Block(
|
||||||
await self.db.insert_block(Block(
|
|
||||||
height=block["height"],
|
height=block["height"],
|
||||||
version=0,
|
version=0,
|
||||||
file_number=0,
|
file_number=0,
|
||||||
|
@ -224,7 +236,7 @@ class BlockHeaderManager:
|
||||||
bits=0, # block["bits"],
|
bits=0, # block["bits"],
|
||||||
nonce=0, # block["nonce"],
|
nonce=0, # block["nonce"],
|
||||||
txs=[]
|
txs=[]
|
||||||
))
|
) for block in blocks])
|
||||||
|
|
||||||
async def get_header(self, height):
|
async def get_header(self, height):
|
||||||
blocks = await self.client.first.block_list(height=height)
|
blocks = await self.client.first.block_list(height=height)
|
||||||
|
|
|
@ -255,7 +255,7 @@ class AsyncUnitDBTestCase(AsyncioTestCase):
|
||||||
|
|
||||||
async def add(self, block_or_tx: Union[Block, Transaction], block_hash: Optional[bytes] = None):
|
async def add(self, block_or_tx: Union[Block, Transaction], block_hash: Optional[bytes] = None):
|
||||||
if isinstance(block_or_tx, Block):
|
if isinstance(block_or_tx, Block):
|
||||||
await self.db.insert_block(block_or_tx)
|
await self.db.insert_blocks([block_or_tx])
|
||||||
for tx in block_or_tx.txs:
|
for tx in block_or_tx.txs:
|
||||||
self.outputs.extend(tx.outputs)
|
self.outputs.extend(tx.outputs)
|
||||||
return block_or_tx
|
return block_or_tx
|
||||||
|
|
Loading…
Add table
Reference in a new issue