This commit is contained in:
Jack Robison 2020-07-14 21:08:46 -04:00 committed by Lex Berezhny
parent 76e60d9bc3
commit 2d8703bb8d

View file

@ -3,7 +3,6 @@ import copy
import time import time
import asyncio import asyncio
import logging import logging
from io import StringIO
from datetime import datetime from datetime import datetime
from functools import partial from functools import partial
from operator import itemgetter from operator import itemgetter
@ -490,7 +489,8 @@ class Ledger(metaclass=LedgerRegistry):
address, remote_status = update address, remote_status = update
self._update_tasks.add(self.update_history(address, remote_status)) self._update_tasks.add(self.update_history(address, remote_status))
async def update_history(self, address, remote_status, address_manager: AddressManager = None, reattempt_update: bool = True): async def update_history(self, address, remote_status, address_manager: AddressManager = None,
reattempt_update: bool = True):
async with self._address_update_locks[address]: async with self._address_update_locks[address]:
self._known_addresses_out_of_sync.discard(address) self._known_addresses_out_of_sync.discard(address)
local_status, local_history = await self.get_local_status_and_history(address) local_status, local_history = await self.get_local_status_and_history(address)
@ -557,19 +557,23 @@ class Ledger(metaclass=LedgerRegistry):
# continue # continue
to_request[i] = (txid, remote_height) to_request[i] = (txid, remote_height)
log.debug("request %i transactions, %i/%i for %s are already synced", len(to_request), len(synced_txs), len(remote_history), address) log.debug(
"request %i transactions, %i/%i for %s are already synced", len(to_request), len(synced_txs),
len(remote_history), address
)
requested_txes = await self._request_transaction_batch(to_request, len(remote_history), address) requested_txes = await self._request_transaction_batch(to_request, len(remote_history), address)
for tx in requested_txes: for tx in requested_txes:
pending_synced_history[tx_indexes[tx.id]] = f"{tx.id}:{tx.height}:" pending_synced_history[tx_indexes[tx.id]] = f"{tx.id}:{tx.height}:"
synced_txs.append(tx) synced_txs.append(tx)
assert len(pending_synced_history) == len(remote_history), f"{len(pending_synced_history)} vs {len(remote_history)}\n{remote_history}\n{pending_synced_history}" assert len(pending_synced_history) == len(remote_history), \
f"{len(pending_synced_history)} vs {len(remote_history)}"
synced_history = "" synced_history = ""
for remote_i, i in zip(range(len(remote_history)), sorted(pending_synced_history.keys())): for remote_i, i in zip(range(len(remote_history)), sorted(pending_synced_history.keys())):
assert i == remote_i, f"{i} vs {remote_i}" assert i == remote_i, f"{i} vs {remote_i}"
txid, height = remote_history[remote_i] txid, height = remote_history[remote_i]
if f"{txid}:{height}:" != pending_synced_history[i]: if f"{txid}:{height}:" != pending_synced_history[i]:
log.warning("history mismatch: %s:%i: vs %s", remote_history[remote_i], pending_synced_history[i]) log.warning("history mismatch: %s vs %s", remote_history[remote_i], pending_synced_history[i])
synced_history += pending_synced_history[i] synced_history += pending_synced_history[i]
cache_size = self.config.get("tx_cache_size", 100_000) cache_size = self.config.get("tx_cache_size", 100_000)
@ -582,7 +586,6 @@ class Ledger(metaclass=LedgerRegistry):
cache_item.lock.release() cache_item.lock.release()
except RuntimeError: except RuntimeError:
log.warning("lock was already released?") log.warning("lock was already released?")
pass
await self.db.save_transaction_io_batch( await self.db.save_transaction_io_batch(
[], address, self.address_to_hash160(address), synced_history [], address, self.address_to_hash160(address), synced_history
@ -724,7 +727,7 @@ class Ledger(metaclass=LedgerRegistry):
try: try:
header = header_cache.get(remote_heights[txid]) or (await self.headers.get(merkle_height)) header = header_cache.get(remote_heights[txid]) or (await self.headers.get(merkle_height))
except IndexError: except IndexError:
log.warning("failed to verify %s at height %i", tx.id, merkle_height) log.warning("failed to verify %s at height %i", tx.id, merkle_height)
else: else:
header_cache[remote_heights[txid]] = header header_cache[remote_heights[txid]] = header
tx.position = merkle['pos'] tx.position = merkle['pos']