diff --git a/lbry/wallet/header.py b/lbry/wallet/header.py index 1375d5442..6af943e3b 100644 --- a/lbry/wallet/header.py +++ b/lbry/wallet/header.py @@ -47,6 +47,8 @@ class Headers: self._size: Optional[int] = None self.chunk_getter: Optional[Callable] = None self.executor = ThreadPoolExecutor(1) + self.known_missing_checkpointed_chunks = set() + self.check_chunk_lock = asyncio.Lock() async def open(self): if not self.executor: @@ -57,6 +59,8 @@ class Headers: else: self.io = open(self.path, 'r+b') self._size = self.io.seek(0, os.SEEK_END) // self.header_size + await self.ensure_checkpointed_size() + await self.get_all_missing_headers() async def close(self): if self.executor: @@ -140,14 +144,19 @@ class Headers: self.io.seek(start * self.header_size, os.SEEK_SET) return self.hash_header(self.io.read(count * self.header_size)).decode() - async def ensure_tip(self): - if self.checkpoints: - await self.ensure_chunk_at(max(self.checkpoints.keys())) + async def ensure_checkpointed_size(self): + max_checkpointed_height = max(self.checkpoints.keys() or [-1]) + if self.height < max_checkpointed_height: + self._write(max_checkpointed_height, bytes([0] * self.header_size * 1000)) async def ensure_chunk_at(self, height): - if await self.has_header(height): - log.debug("has header %s", height) - return + async with self.check_chunk_lock: + if await self.has_header(height): + log.debug("has header %s", height) + return + return await self.fetch_chunk(height) + + async def fetch_chunk(self, height): log.info("on-demand fetching height %s", height) start = (height // 1000) * 1000 headers = await self.chunk_getter(start) # pylint: disable=not-callable @@ -156,7 +165,10 @@ class Headers: ) chunk_hash = self.hash_header(chunk).decode() if self.checkpoints.get(start) == chunk_hash: - return await asyncio.get_running_loop().run_in_executor(self.executor, self._write, start, chunk) + await asyncio.get_running_loop().run_in_executor(self.executor, self._write, start, chunk) + if start in self.known_missing_checkpointed_chunks: + self.known_missing_checkpointed_chunks.remove(start) + return elif start not in self.checkpoints: return # todo: fixme raise Exception( @@ -164,12 +176,27 @@ class Headers: ) async def has_header(self, height): + normalized_height = (height // 1000) * 1000 + if normalized_height in self.checkpoints: + return normalized_height not in self.known_missing_checkpointed_chunks + def _has_header(height): empty = '56944c5d3f98413ef45cf54545538103cc9f298e0575820ad3591376e2e0f65d' all_zeroes = '789d737d4f448e554b318c94063bbfa63e9ccda6e208f5648ca76ee68896557b' return self.chunk_hash(height, 1) not in (empty, all_zeroes) return await asyncio.get_running_loop().run_in_executor(self.executor, _has_header, height) + async def get_all_missing_headers(self): + # Heavy operation done in one optimized shot + def _io_checkall(): + for chunk_height, expected_hash in reversed(list(self.checkpoints.items())): + if chunk_height in self.known_missing_checkpointed_chunks: + continue + if self.chunk_hash(chunk_height, 1000) != expected_hash: + self.known_missing_checkpointed_chunks.add(chunk_height) + return self.known_missing_checkpointed_chunks + return await asyncio.get_running_loop().run_in_executor(self.executor, _io_checkall) + @property def height(self) -> int: return len(self)-1 diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 0e648789b..907aec8ba 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -344,15 +344,13 @@ class Ledger(metaclass=LedgerRegistry): return max(self.headers.height, self._download_height) async def initial_headers_sync(self): - target = self.network.remote_height + 1 get_chunk = partial(self.network.retriable_call, self.network.get_headers, count=1000, b64=True) self.headers.chunk_getter = get_chunk async def doit(): - for height in reversed(range(0, target)): - async with self._header_processing_lock: + async with self._header_processing_lock: + for height in reversed(sorted(self.headers.known_missing_checkpointed_chunks)): await self.headers.ensure_chunk_at(height) - await self.headers.ensure_tip() self._update_tasks.add(doit()) await self.update_headers() diff --git a/tests/unit/wallet/test_headers.py b/tests/unit/wallet/test_headers.py index 1d1589648..52dfc2117 100644 --- a/tests/unit/wallet/test_headers.py +++ b/tests/unit/wallet/test_headers.py @@ -1,12 +1,15 @@ import os import asyncio import tempfile -from binascii import hexlify, unhexlify +from binascii import unhexlify -from lbry.crypto.hash import sha256 from lbry.wallet.util import ArithUint256 from lbry.testcase import AsyncioTestCase -from lbry.wallet.ledger import Headers +from lbry.wallet.ledger import Headers as _Headers + + +class Headers(_Headers): + checkpoints = {} def block_bytes(blocks):