diff --git a/lbry/lbry/extras/daemon/Components.py b/lbry/lbry/extras/daemon/Components.py index 232a66481..24caf5aa5 100644 --- a/lbry/lbry/extras/daemon/Components.py +++ b/lbry/lbry/extras/daemon/Components.py @@ -96,12 +96,9 @@ class DatabaseComponent(Component): self.storage = None -HEADERS_URL = "https://headers.lbry.io/blockchain_headers_latest" -HEADER_SIZE = 112 - - class HeadersComponent(Component): component_name = HEADERS_COMPONENT + HEADERS_URL = "https://headers.lbry.io/blockchain_headers_latest" def __init__(self, component_manager): super().__init__(component_manager) @@ -137,23 +134,19 @@ class HeadersComponent(Component): async def fetch_headers_from_s3(self): local_header_size = self.headers.bytes_size resume_header = {"Range": f"bytes={local_header_size}-"} - async with utils.aiohttp_request('get', HEADERS_URL, headers=resume_header) as response: - if response.status == 406 or response.content_length < HEADER_SIZE: # our file is bigger + async with utils.aiohttp_request('get', self.HEADERS_URL, headers=resume_header) as response: + if response.status == 406 or response.content_length < self.headers.header_size: # our file is bigger log.warning("s3 is more out of date than we are") return - if response.content_length % HEADER_SIZE != 0: + if response.content_length % self.headers.header_size != 0: log.warning("s3 appears to have corrupted header") return final_size_after_download = response.content_length + local_header_size if local_header_size > 0: log.info("Resuming download of %i bytes from s3", response.content_length) - buffer, header_size = b'', self.headers.header_size - async for chunk in response.content.iter_any(): - chunk = buffer + chunk - remaining = len(chunk) % header_size - chunk, buffer = chunk[:-remaining], bytes(chunk[-remaining:]) - if not chunk: - continue + while not response.content.at_eof(): + max_read = min(self.headers.header_size * 10000, final_size_after_download - self.headers.bytes_size) + chunk = await response.content.readexactly(max_read) if not await self.headers.connect(len(self.headers), chunk): log.warning("Error connecting downloaded headers from at %s.", self.headers.height) return @@ -167,13 +160,11 @@ class HeadersComponent(Component): return 0 async def get_downloadable_header_height(self) -> typing.Optional[int]: - try: - async with utils.aiohttp_request('HEAD', HEADERS_URL) as response: - if response.status != 200: - log.warning("Header download error, unexpected response code: %s", response.status) - return response.content_length // HEADER_SIZE - except OSError: - log.exception("Failed to download headers using https.") + async with utils.aiohttp_request('HEAD', self.HEADERS_URL) as response: + if response.status != 200: + log.warning("Header download error, unexpected response code: %s", response.status) + return -1 + return response.content_length // self.headers.header_size async def should_download_headers_from_s3(self) -> bool: if self.conf.blockchain_name != "lbrycrd_main": @@ -196,17 +187,17 @@ class HeadersComponent(Component): if os.path.exists(self.old_file): log.warning("Moving old headers from %s to %s.", self.old_file, self.headers_file) os.rename(self.old_file, self.headers_file) - await self.headers.open() - await self.headers.repair() - if await self.should_download_headers_from_s3(): - try: + try: + await self.headers.open() + await self.headers.repair() + if await self.should_download_headers_from_s3(): self.is_downloading_headers = True await self.fetch_headers_from_s3() - except Exception as err: - log.error("failed to fetch headers from s3: %s", err) - finally: - self.is_downloading_headers = False - await self.headers.close() + except Exception as err: + log.error("failed to fetch headers from s3: %s", err) + finally: + self.is_downloading_headers = False + await self.headers.close() async def stop(self): pass diff --git a/lbry/tests/integration/test_wallet_server_sessions.py b/lbry/tests/integration/test_wallet_server_sessions.py index 03dd4e027..b2e9c779d 100644 --- a/lbry/tests/integration/test_wallet_server_sessions.py +++ b/lbry/tests/integration/test_wallet_server_sessions.py @@ -1,9 +1,11 @@ import asyncio +import os -from torba.client.basenetwork import ClientSession -from torba.orchstr8 import Conductor -from torba.testcase import IntegrationTestCase import lbry.wallet +from lbry.testcase import CommandTestCase +from lbry.extras.daemon.Components import HeadersComponent +from torba.client.basenetwork import ClientSession +from torba.testcase import IntegrationTestCase class TestSessionBloat(IntegrationTestCase): @@ -29,3 +31,22 @@ class TestSessionBloat(IntegrationTestCase): await session.send_request('server.banner', ()) self.assertTrue(session.is_closing()) self.assertEqual(len(self.conductor.spv_node.server.session_mgr.sessions), 0) + + +class TestHeadersComponent(CommandTestCase): + + LEDGER = lbry.wallet + + async def asyncSetUp(self): + await super().asyncSetUp() + self.component_manager = self.daemon.component_manager + self.component_manager.conf.blockchain_name = 'lbrycrd_main' + self.headers_component = HeadersComponent(self.component_manager) + + async def test_cant_reach_host(self): + HeadersComponent.HEADERS_URL = 'notthere/' + os.unlink(self.headers_component.headers.path) + # test is that this doesnt raise + await self.headers_component.start() + self.assertTrue(self.component_manager.get_components_status()['blockchain_headers']) + self.assertEqual(await self.headers_component.get_status(), {}) \ No newline at end of file diff --git a/torba/torba/client/baseheader.py b/torba/torba/client/baseheader.py index 339dd5e54..22cc7aef6 100644 --- a/torba/torba/client/baseheader.py +++ b/torba/torba/client/baseheader.py @@ -115,7 +115,7 @@ class BaseHeaders: # .seek()/.write()/.truncate() might also .flush() when needed # the goal here is mainly to ensure we're definitely flush()'ing self.io.flush() - self._size = None + self._size = self.io.tell() // self.header_size added += written if bail: break