forked from LBRYCommunity/lbry-sdk
fix
This commit is contained in:
parent
fce80374f4
commit
fc1a06bc45
1 changed files with 35 additions and 18 deletions
|
@ -133,7 +133,7 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
self._on_transaction_controller = StreamController()
|
self._on_transaction_controller = StreamController()
|
||||||
self.on_transaction = self._on_transaction_controller.stream
|
self.on_transaction = self._on_transaction_controller.stream
|
||||||
self.on_transaction.listen(
|
self.on_transaction.listen(
|
||||||
lambda e: log.info(
|
lambda e: log.debug(
|
||||||
'(%s) on_transaction: address=%s, height=%s, is_verified=%s, tx.id=%s',
|
'(%s) on_transaction: address=%s, height=%s, is_verified=%s, tx.id=%s',
|
||||||
self.get_id(), e.address, e.tx.height, e.tx.is_verified, e.tx.id
|
self.get_id(), e.address, e.tx.height, e.tx.is_verified, e.tx.id
|
||||||
)
|
)
|
||||||
|
@ -520,9 +520,13 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
if cache_item is None:
|
if cache_item is None:
|
||||||
cache_item = TransactionCacheItem()
|
cache_item = TransactionCacheItem()
|
||||||
self._tx_cache[txid] = cache_item
|
self._tx_cache[txid] = cache_item
|
||||||
|
if len(acquire_lock_tasks) > 10000:
|
||||||
|
await asyncio.wait(acquire_lock_tasks)
|
||||||
|
acquire_lock_tasks.clear()
|
||||||
acquire_lock_tasks.append(asyncio.create_task(cache_item.lock.acquire()))
|
acquire_lock_tasks.append(asyncio.create_task(cache_item.lock.acquire()))
|
||||||
|
|
||||||
await asyncio.wait(acquire_lock_tasks)
|
if acquire_lock_tasks:
|
||||||
|
await asyncio.wait(acquire_lock_tasks)
|
||||||
|
|
||||||
tx_indexes = {}
|
tx_indexes = {}
|
||||||
|
|
||||||
|
@ -546,11 +550,10 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
to_request[i] = (txid, remote_height)
|
to_request[i] = (txid, remote_height)
|
||||||
|
|
||||||
log.info("request %i transactions, %i/%i for %s are already synced", len(to_request), len(synced_txs), len(remote_history), address)
|
log.info("request %i transactions, %i/%i for %s are already synced", len(to_request), len(synced_txs), len(remote_history), address)
|
||||||
requested_txes = await self._request_transaction_batch(to_request)
|
requested_txes = await self._request_transaction_batch(to_request, len(remote_history), address)
|
||||||
for tx in requested_txes:
|
for tx in requested_txes:
|
||||||
pending_synced_history[tx_indexes[tx.id]] = f"{tx.id}:{tx.height}:"
|
pending_synced_history[tx_indexes[tx.id]] = f"{tx.id}:{tx.height}:"
|
||||||
synced_txs.append(tx)
|
synced_txs.append(tx)
|
||||||
log.info("synced %i/%i transactions for %s", len(synced_txs), len(remote_history), address)
|
|
||||||
|
|
||||||
assert len(pending_synced_history) == len(remote_history), f"{len(pending_synced_history)} vs {len(remote_history)}\n{remote_history}\n{pending_synced_history}"
|
assert len(pending_synced_history) == len(remote_history), f"{len(pending_synced_history)} vs {len(remote_history)}\n{remote_history}\n{pending_synced_history}"
|
||||||
synced_history = ""
|
synced_history = ""
|
||||||
|
@ -573,14 +576,11 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
log.warning("lock was already released?")
|
log.warning("lock was already released?")
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
log.info("updating address count and status")
|
||||||
await self.db.save_transaction_io_batch(
|
await self.db.save_transaction_io_batch(
|
||||||
synced_txs, address, self.address_to_hash160(address), synced_history
|
[], address, self.address_to_hash160(address), synced_history
|
||||||
)
|
)
|
||||||
|
log.info("updated address count and status")
|
||||||
await asyncio.wait([
|
|
||||||
self._on_transaction_controller.add(TransactionEvent(address, tx))
|
|
||||||
for tx in synced_txs
|
|
||||||
])
|
|
||||||
|
|
||||||
if address_manager is None:
|
if address_manager is None:
|
||||||
address_manager = await self.get_address_manager_for_address(address)
|
address_manager = await self.get_address_manager_for_address(address)
|
||||||
|
@ -623,6 +623,7 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
self._known_addresses_out_of_sync.add(address)
|
self._known_addresses_out_of_sync.add(address)
|
||||||
return False
|
return False
|
||||||
else:
|
else:
|
||||||
|
log.info("synced %s", address)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
async def cache_transaction(self, txid, remote_height, check_local=True):
|
async def cache_transaction(self, txid, remote_height, check_local=True):
|
||||||
|
@ -676,23 +677,29 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
tx.position = merkle['pos']
|
tx.position = merkle['pos']
|
||||||
tx.is_verified = merkle_root == header['merkle_root']
|
tx.is_verified = merkle_root == header['merkle_root']
|
||||||
|
|
||||||
async def _request_transaction_batch(self, to_request):
|
async def _request_transaction_batch(self, to_request, remote_history_size, address):
|
||||||
header_cache = {}
|
header_cache = {}
|
||||||
batches = [[]]
|
batches = [[]]
|
||||||
remote_heights = {}
|
remote_heights = {}
|
||||||
synced_txs = []
|
synced_txs = []
|
||||||
|
heights_in_batch = 1
|
||||||
for idx in sorted(to_request):
|
for idx in sorted(to_request):
|
||||||
txid = to_request[idx][0]
|
txid = to_request[idx][0]
|
||||||
height = to_request[idx][1]
|
height = to_request[idx][1]
|
||||||
remote_heights[txid] = height
|
remote_heights[txid] = height
|
||||||
if len(batches[-1]) == 1:
|
if idx > 1 and height != remote_heights[batches[-1][-1]]:
|
||||||
|
heights_in_batch += 1
|
||||||
|
if len(batches[-1]) == 100 or heights_in_batch == 10:
|
||||||
batches.append([])
|
batches.append([])
|
||||||
|
heights_in_batch = 1
|
||||||
batches[-1].append(txid)
|
batches[-1].append(txid)
|
||||||
if not len(batches[-1]):
|
if not len(batches[-1]):
|
||||||
batches.pop()
|
batches.pop()
|
||||||
|
|
||||||
|
last_showed_synced_count = 0
|
||||||
|
|
||||||
async def _single_batch(batch):
|
async def _single_batch(batch):
|
||||||
|
this_batch_synced = []
|
||||||
batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch)
|
batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch)
|
||||||
for txid, (raw, merkle) in batch_result.items():
|
for txid, (raw, merkle) in batch_result.items():
|
||||||
remote_height = remote_heights[txid]
|
remote_height = remote_heights[txid]
|
||||||
|
@ -740,13 +747,23 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
if referenced_txo is not None:
|
if referenced_txo is not None:
|
||||||
txi.txo_ref = referenced_txo.ref
|
txi.txo_ref = referenced_txo.ref
|
||||||
synced_txs.append(tx)
|
synced_txs.append(tx)
|
||||||
|
this_batch_synced.append(tx)
|
||||||
if batches:
|
await self.db.save_transaction_io_batch(
|
||||||
|
this_batch_synced, address, self.address_to_hash160(address), ""
|
||||||
|
)
|
||||||
await asyncio.wait([
|
await asyncio.wait([
|
||||||
asyncio.create_task(
|
self._on_transaction_controller.add(TransactionEvent(address, tx))
|
||||||
_single_batch(_batch)
|
for tx in this_batch_synced
|
||||||
) for _batch in batches
|
|
||||||
])
|
])
|
||||||
|
nonlocal last_showed_synced_count
|
||||||
|
if last_showed_synced_count + 100 < len(synced_txs):
|
||||||
|
log.info("synced %i/%i transactions for %s", len(synced_txs), remote_history_size, address)
|
||||||
|
last_showed_synced_count = len(synced_txs)
|
||||||
|
|
||||||
|
await asyncio.wait(
|
||||||
|
[_single_batch(batch) for batch in batches]
|
||||||
|
)
|
||||||
|
log.info("finished syncing history for %s", address)
|
||||||
return synced_txs
|
return synced_txs
|
||||||
|
|
||||||
async def get_address_manager_for_address(self, address) -> Optional[AddressManager]:
|
async def get_address_manager_for_address(self, address) -> Optional[AddressManager]:
|
||||||
|
|
Loading…
Add table
Reference in a new issue