diff --git a/lbry/wallet/header.py b/lbry/wallet/header.py index c5af445aa..97956d54d 100644 --- a/lbry/wallet/header.py +++ b/lbry/wallet/header.py @@ -2,13 +2,11 @@ import base64 import os import struct import asyncio -import hashlib import logging import zlib from concurrent.futures.thread import ThreadPoolExecutor from io import BytesIO -from contextlib import asynccontextmanager from typing import Optional, Iterator, Tuple, Callable from binascii import hexlify, unhexlify @@ -36,7 +34,7 @@ class Headers: max_target = 0x0000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff genesis_hash = b'9c89283ba0f3227f6c03b70216b9f665f0118d5e0fa729cedf4fb34d6a34f463' target_timespan = 150 - checkpoint = (600_000, b'100b33ca3d0b86a48f0d6d6f30458a130ecb89d5affefe4afccb134d5a40f4c2') + checkpoints = HASHES first_block_timestamp = 1466646588 # block 1, as 0 is off by a lot timestamp_average_offset = 160.6855883050695 # calculated at 733447 @@ -113,8 +111,6 @@ class Headers: return True async def get(self, height) -> dict: - if height < 0: - raise IndexError(f"Height cannot be negative!!") if isinstance(height, slice): raise NotImplementedError("Slicing of header chain has not been implemented yet.") try: @@ -128,6 +124,8 @@ class Headers: async def get_raw_header(self, height) -> bytes: if self.chunk_getter: await self.ensure_chunk_at(height) + if not 0 <= height <= self.height: + raise IndexError(f"{height} is out of bounds, current height: {self.height}") return await asyncio.get_running_loop().run_in_executor(self.executor, self._read, height) def _read(self, height, count=1): @@ -139,7 +137,8 @@ class Headers: return self.hash_header(self.io.read(count * self.header_size)).decode() async def ensure_tip(self): - await self.ensure_chunk_at(max(HASHES.keys())) + if self.checkpoints: + await self.ensure_chunk_at(max(self.checkpoints.keys())) async def ensure_chunk_at(self, height): if await self.has_header(height): @@ -152,12 +151,12 @@ class Headers: zlib.decompress(base64.b64decode(headers['base64']), wbits=-15, bufsize=600_000) ) chunk_hash = self.hash_header(chunk).decode() - if HASHES.get(start) == chunk_hash: + if self.checkpoints.get(start) == chunk_hash: return await asyncio.get_running_loop().run_in_executor(self.executor, self._write, start, chunk) - elif start not in HASHES: + elif start not in self.checkpoints: return # todo: fixme raise Exception( - f"Checkpoint mismatch at height {start}. Expected {HASHES[start]}, but got {chunk_hash} instead." + f"Checkpoint mismatch at height {start}. Expected {self.checkpoints[start]}, but got {chunk_hash} instead." ) async def has_header(self, height): @@ -186,33 +185,6 @@ class Headers: return b'0' * 64 return hexlify(double_sha256(header)[::-1]) - @asynccontextmanager - async def checkpointed_connector(self): - buf = BytesIO() - try: - yield buf - finally: - await asyncio.sleep(0) - final_height = len(self) + buf.tell() // self.header_size - verifiable_bytes = (self.checkpoint[0] - len(self)) * self.header_size if self.checkpoint else 0 - if verifiable_bytes > 0 and final_height >= self.checkpoint[0]: - buf.seek(0) - self.io.seek(0) - h = hashlib.sha256() - h.update(self.io.read()) - h.update(buf.read(verifiable_bytes)) - if h.hexdigest().encode() == self.checkpoint[1]: - buf.seek(0) - self._write(len(self), buf.read(verifiable_bytes)) - remaining = buf.read() - buf.seek(0) - buf.write(remaining) - buf.truncate() - else: - log.warning("Checkpoint mismatch, connecting headers through slow method.") - if buf.tell() > 0: - await self.connect(len(self), buf.getvalue()) - async def connect(self, start: int, headers: bytes) -> int: added = 0 bail = False @@ -223,7 +195,8 @@ class Headers: except InvalidHeader as e: bail = True chunk = chunk[:(height-e.height)*self.header_size] - added += self._write(height, chunk) if chunk else 0 + if chunk: + added += await asyncio.get_running_loop().run_in_executor(self.executor, self._write, height, chunk) if bail: break return added @@ -235,14 +208,15 @@ class Headers: # .seek()/.write()/.truncate() might also .flush() when needed # the goal here is mainly to ensure we're definitely flush()'ing self.io.flush() - self._size = self.io.seek(0, os.SEEK_END) // self.header_size + self._size = self.io.tell() // self.header_size return written async def validate_chunk(self, height, chunk): previous_hash, previous_header, previous_previous_header = None, None, None if height > 0: - previous_header = await self.get(height-1) - previous_hash = await self.hash(height-1) + raw = await self.get_raw_header(height-1) + previous_header = self.deserialize(height-1, raw) + previous_hash = self.hash_header(raw) if height > 1: previous_previous_header = await self.get(height-2) chunk_target = self.get_next_chunk_target(height // 2016 - 1) @@ -345,3 +319,4 @@ class UnvalidatedHeaders(Headers): validate_difficulty = False max_target = 0x7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff genesis_hash = b'6e3fcf1299d4ec5d79c3a4c91d624a4acf9e2e173d95a1a0504f677669687556' + checkpoints = {} diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index cfd14ef0a..24d9b1e71 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -125,14 +125,14 @@ class FileCommands(CommandTestCase): file_list = await self.file_list() self.assertEqual( file_list[0]['timestamp'], - None + self.ledger.headers.estimated_timestamp(file_list[0]['height']) ) self.assertEqual(file_list[0]['confirmations'], -1) await self.daemon.jsonrpc_resolve('foo') file_list = await self.file_list() self.assertEqual( file_list[0]['timestamp'], - self.ledger.headers[file_list[0]['height']]['timestamp'] + self.ledger.headers.estimated_timestamp(file_list[0]['height']) ) self.assertEqual(file_list[0]['confirmations'], 1) diff --git a/tests/unit/wallet/test_headers.py b/tests/unit/wallet/test_headers.py index 968766725..1d1589648 100644 --- a/tests/unit/wallet/test_headers.py +++ b/tests/unit/wallet/test_headers.py @@ -19,6 +19,7 @@ class TestHeaders(AsyncioTestCase): self.maxDiff = None h = Headers(':memory:') h.io.write(HEADERS) + await h.open() self.assertEqual(await h.get(0), { 'bits': 520159231, 'block_height': 0, @@ -140,23 +141,6 @@ class TestHeaders(AsyncioTestCase): await headers.connect(len(headers), HEADERS[block_bytes(8):]) self.assertEqual(19, headers.height) - async def test_checkpointed_writer(self): - headers = Headers(':memory:') - await headers.open() - getblocks = lambda start, end: HEADERS[block_bytes(start):block_bytes(end)] - headers.checkpoint = 10, hexlify(sha256(getblocks(10, 11))) - async with headers.checkpointed_connector() as buff: - buff.write(getblocks(0, 10)) - self.assertEqual(len(headers), 10) - async with headers.checkpointed_connector() as buff: - buff.write(getblocks(10, 19)) - self.assertEqual(len(headers), 19) - headers = Headers(':memory:') - await headers.open() - async with headers.checkpointed_connector() as buff: - buff.write(getblocks(0, 19)) - self.assertEqual(len(headers), 19) - async def test_concurrency(self): BLOCKS = 19 headers_temporary_file = tempfile.mktemp() @@ -168,7 +152,7 @@ class TestHeaders(AsyncioTestCase): await headers.connect(block_index, HEADERS[block_bytes(block_index):block_bytes(block_index + 1)]) async def reader(): for block_index in range(BLOCKS): - while len(headers) < block_index: + while len(headers) <= block_index: await asyncio.sleep(0.000001) assert (await headers.get(block_index))['block_height'] == block_index reader_task = asyncio.create_task(reader())