forked from LBRYCommunity/lbry-sdk
batched blockchain.address.subscribe
This commit is contained in:
parent
0ee7870bdf
commit
38b108752e
3 changed files with 36 additions and 13 deletions
|
@ -425,6 +425,7 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
|
|
||||||
async def subscribe_accounts(self):
|
async def subscribe_accounts(self):
|
||||||
if self.network.is_connected and self.accounts:
|
if self.network.is_connected and self.accounts:
|
||||||
|
log.info("Subscribe to %i accounts", len(self.accounts))
|
||||||
await asyncio.wait([
|
await asyncio.wait([
|
||||||
self.subscribe_account(a) for a in self.accounts
|
self.subscribe_account(a) for a in self.accounts
|
||||||
])
|
])
|
||||||
|
@ -444,15 +445,32 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
AddressesGeneratedEvent(address_manager, addresses)
|
AddressesGeneratedEvent(address_manager, addresses)
|
||||||
)
|
)
|
||||||
|
|
||||||
async def subscribe_addresses(self, address_manager: AddressManager, addresses: List[str]):
|
async def subscribe_addresses(self, address_manager: AddressManager, addresses: List[str], batch_size: int = 1000):
|
||||||
if self.network.is_connected and addresses:
|
if self.network.is_connected and addresses:
|
||||||
await asyncio.wait([
|
addresses_remaining = list(addresses)
|
||||||
self.subscribe_address(address_manager, address) for address in addresses
|
retries = 0
|
||||||
])
|
while addresses_remaining:
|
||||||
|
batch = addresses_remaining[:batch_size]
|
||||||
async def subscribe_address(self, address_manager: AddressManager, address: str):
|
try:
|
||||||
remote_status = await self.network.subscribe_address(address)
|
results = await self.network.rpc('blockchain.address.subscribe', batch, True)
|
||||||
self._update_tasks.add(self.update_history(address, remote_status, address_manager))
|
for address, remote_status in zip(batch, results):
|
||||||
|
self._update_tasks.add(self.update_history(address, remote_status, address_manager))
|
||||||
|
retries = 0
|
||||||
|
addresses_remaining = addresses_remaining[batch_size:]
|
||||||
|
log.info("subscribed to %i/%i addresses", len(addresses) - len(addresses_remaining), len(addresses))
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
if retries >= 3:
|
||||||
|
log.warning(
|
||||||
|
"timed out subscribing to addresses from %s:%i",
|
||||||
|
*self.network.client.server_address_and_port
|
||||||
|
)
|
||||||
|
# abort and cancel, we can't lose a subscription, it will happen again on reconnect
|
||||||
|
if self.network.client:
|
||||||
|
self.network.client.abort()
|
||||||
|
raise asyncio.CancelledError()
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
retries += 1
|
||||||
|
log.info("finished subscribing to %i addresses", len(addresses))
|
||||||
|
|
||||||
def process_status_update(self, update):
|
def process_status_update(self, update):
|
||||||
address, remote_status = update
|
address, remote_status = update
|
||||||
|
|
|
@ -256,9 +256,10 @@ class Network:
|
||||||
def subscribe_headers(self):
|
def subscribe_headers(self):
|
||||||
return self.rpc('blockchain.headers.subscribe', [True], True)
|
return self.rpc('blockchain.headers.subscribe', [True], True)
|
||||||
|
|
||||||
async def subscribe_address(self, address):
|
async def subscribe_address(self, address, *addresses):
|
||||||
|
addresses = list((address, ) + addresses)
|
||||||
try:
|
try:
|
||||||
return await self.rpc('blockchain.address.subscribe', [address], True)
|
return await self.rpc('blockchain.address.subscribe', addresses, True)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
# abort and cancel, we can't lose a subscription, it will happen again on reconnect
|
# abort and cancel, we can't lose a subscription, it will happen again on reconnect
|
||||||
if self.client:
|
if self.client:
|
||||||
|
|
|
@ -1141,12 +1141,16 @@ class LBRYElectrumX(SessionBase):
|
||||||
hashX = self.address_to_hashX(address)
|
hashX = self.address_to_hashX(address)
|
||||||
return await self.hashX_listunspent(hashX)
|
return await self.hashX_listunspent(hashX)
|
||||||
|
|
||||||
async def address_subscribe(self, address):
|
async def address_subscribe(self, *addresses):
|
||||||
"""Subscribe to an address.
|
"""Subscribe to an address.
|
||||||
|
|
||||||
address: the address to subscribe to"""
|
address: the address to subscribe to"""
|
||||||
hashX = self.address_to_hashX(address)
|
if len(addresses) > 1000:
|
||||||
return await self.hashX_subscribe(hashX, address)
|
raise RPCError(BAD_REQUEST, f'too many addresses in subscription request: {len(addresses)}')
|
||||||
|
hashXes = [
|
||||||
|
(self.address_to_hashX(address), address) for address in addresses
|
||||||
|
]
|
||||||
|
return await asyncio.gather(*(self.hashX_subscribe(*args) for args in hashXes))
|
||||||
|
|
||||||
async def address_unsubscribe(self, address):
|
async def address_unsubscribe(self, address):
|
||||||
"""Unsubscribe an address.
|
"""Unsubscribe an address.
|
||||||
|
|
Loading…
Add table
Reference in a new issue